Coder Social home page Coder Social logo

Comments (23)

pepsighan avatar pepsighan commented on May 4, 2024 24

You may create a db pool, and get each connection from the pool per request like so:

use warp::{self, reject, Filter};
use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};

type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;

fn pg_pool() -> PgPool {
    let manager = ConnectionManager::<PgConnection>::new("postgres://username:password@localhost/db_name");
    Pool::new(manager).expect("Postgres connection pool could not be created")
}

fn main() {
    let pool = pg_pool();

    // setup the the connection pool to get a connection on each request
    let pg = warp::any()
        .map(move || pool.clone())
        .and_then(|pool: PgPool| match pool.get() {
            Ok(conn) => Ok(conn),
            Err(_) => Err(reject::server_error()),
        });

    let hello_world = warp::get(
        warp::index()
            // use the pg connection on the route
            .and(pg)
            .map(|db: PooledPg| {
              // use the conn

              "Hello World!"
            }),
    );

    warp::serve(hello_world).run(([127, 0, 0, 1], 3001));
}

from warp.

ubnt-intrepid avatar ubnt-intrepid commented on May 4, 2024 4

@seanmonstar Finchers provides a helper function for building a Future from a function that may block the current thread. It seems to be good to introduce such a helper function also in warp, in order to avoid confusion like this thread.

from warp.

adals avatar adals commented on May 4, 2024 2

thank you all

I think its good way to put an example for diesel inside example folder

from warp.

pepsighan avatar pepsighan commented on May 4, 2024 1

@andy-rust You are copying the code as is. Replace // use the conn part with your route logic.

I'll update the example with a dummy response.

from warp.

andywwright avatar andywwright commented on May 4, 2024 1

@csharad - thank you very much for the prompt answer! Now in the retrospective, I have to admit the question was stupid, sorry about that.

The only thing it had been swearing at me was "warning: use of deprecated item 'warp::get': warp::get2() is meant to replace get()"

With your permission I'd put your solution with the quick fix for anyone to just grab and run:

use warp::{Filter, reject};
use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};

type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;

fn pg_pool() -> PgPool {
    let manager = ConnectionManager::<PgConnection>::new("postgres://username:password@localhost/db_name");
    Pool::new(manager).expect("Postgres connection pool could not be created")
}

fn main() {
    let pool = pg_pool();

    let pg = warp::any()
        .map(move || pool.clone())
        .and_then(|pool: PgPool| match pool.get() {
            Ok(conn) => Ok(conn),
            Err(_) => Err(reject::server_error()),
        });

    let index_from_db = warp::path::index()
        .and(pg)
        .map(|db: PooledPg| {
            "Get the data from DB"
        });

    let routes = warp::get2()
    	.and(index_from_db);

    warp::serve(routes)
    	.run(([0, 0, 0, 0], 3030));
}

from warp.

ThePianoDentist avatar ThePianoDentist commented on May 4, 2024 1

Been trying this with warp 2.0

In examples above generating a connection just requires small modification of adding async move inside and_then

    let pg_conn = warp::any().map(move || pool.clone()).and_then(|pool: PgPool| async move{ match pool.get(){
        Ok(conn) => Ok(conn),
        Err(_) => Err(reject::custom(PgPoolError)),
    }});

will edit with any other alterations

from warp.

ThePianoDentist avatar ThePianoDentist commented on May 4, 2024 1

@dakom apologies. Im in same boat as you.

Using this to test/learn async rust. Nothing you've written sounds incorrect to me, but I'm also noob!

from warp.

seanmonstar avatar seanmonstar commented on May 4, 2024

I've haven't used diesel yet, so if anyone else wants to help show how to use it here, that'd be awesome!

from warp.

dd10-e avatar dd10-e commented on May 4, 2024

I used Diesel very naively (it doesn't support connection pooling! I need to investigate) but it works.

Just write an etablish_connection function

use diesel::pg::PgConnection;
use diesel::prelude::*;
use dotenv::dotenv;
use std::env;

pub fn establish_connection() -> PgConnection {
    dotenv().ok();

    let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
    PgConnection::establish(&database_url).expect(&format!("Error connecting to {}", database_url))
}

And used it like this for example:

pub fn retrieve_one_user(id_param: i32) -> Result<impl warp::Reply, warp::Rejection> {
    use schema::users::dsl::*;

    let connection = establish_connection();
    let results = users
        .select((id, email, username, created_at, updated_at))
        .find(id_param)
        .load::<PublicUserData>(&connection)
        .optional()
        .expect("Error loading user");

    match results {
        Some(json) => {
            if json.is_empty() == true {
                return Err(warp::reject::not_found());
            }
            Ok(warp::reply::json(&json))
        }
        None => Err(warp::reject::server_error()),
    }
}

from warp.

adals avatar adals commented on May 4, 2024

thanks
Appreciated

from warp.

seanmonstar avatar seanmonstar commented on May 4, 2024

I believe in that case that uses a blocking connection to load from the database, so you'd probably want to move that to a separate thread (or consider wrapping it in tokio_threadpool::blocking).

from warp.

andywwright avatar andywwright commented on May 4, 2024

unfortunately, the example of @csharad does not work for me :(

warp::serve(hello_world)
^^^^^^^^^^^ the trait `warp::reply::sealed::ReplySealed` is not implemented for `()`

from warp.

stefanoc avatar stefanoc commented on May 4, 2024

@andy-rust what if you need to share the pool between multiple routes?

from warp.

stefanoc avatar stefanoc commented on May 4, 2024

Sorry I meant to reply to @csharad :-)

from warp.

pepsighan avatar pepsighan commented on May 4, 2024

@stefanoc Use the pg filter in other routes like:

let index_route = warp::path::index()
        .and(pg.clone())
        .map(|db: PooledPg| {
            "Get the data from DB"
        });

let login_route = path!("login")
        .and(pg)
        .map(|db: PooledPg| {
            "Get the data from DB"
        });

from warp.

algermissen avatar algermissen commented on May 4, 2024

From a quick glance on this code, I would argue that it is still running on the thread that warp is running on and since Diesel uses blocking operations, it will block warp's thread.

If I am correctly reading this, then all Diesel operations still must be moved to it's own Tokio thread pool as @seanmonstar mentioned above.

If I am wrong about the Diesel internals, please ignore my comment.

from warp.

pepsighan avatar pepsighan commented on May 4, 2024

The diesel operations work on its own thread pool, if r2d2 is used as far as I know. But still it is a blocking operation and does not interact with futures.

from warp.

stefanoc avatar stefanoc commented on May 4, 2024

@csharad you're moving pg in the first route, so you can't use it in the second route.

from warp.

pepsighan avatar pepsighan commented on May 4, 2024

@stefanoc Ah! yes. It clonable not copyable. Will fix that.

from warp.

goriunov avatar goriunov commented on May 4, 2024

@csharad, @stefanoc, @Elod10, @seanmonstar, @carllerche
I have slightly modified the code of @andy-rust to show how to use it with tokio_threadpool::blocking

extern crate tokio_threadpool;

use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
use warp::{reject, Filter};

type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;

fn pg_pool() -> PgPool {
  let manager =
    ConnectionManager::<PgConnection>::new("postgres://username:password@localhost/db_name");
  Pool::new(manager).expect("Postgres connection pool could not be created")
}

fn main() {
  let pool = pg_pool();

  let pg = warp::any()
    .map(move || pool.clone())
    .and_then(|pool: PgPool| match pool.get() {
      Ok(conn) => Ok(conn),
      Err(_) => Err(reject::server_error()),
    });

  let index_from_db = warp::path::index().and(pg).map(|db: PooledPg| {
    tokio_threadpool::blocking(|| {
      diesel::insert_into(posts::table)
        .values(&NewPost {
          title: String::from("Hello world"),
          body: String::from("Hope it works"),
        })
        .execute(&db)
        .unwrap();
    })
    .and_then(|_| Ok("Set data in DB"))
    .unwrap()
  });

  let routes = warp::get2().and(index_from_db);

  warp::serve(routes).run(([0, 0, 0, 0], 3030));

But tokio_threadpool::blocking or diesel or tokio has some weird issue (at least on Arch linux) if i benchmark the code above i will get:

wrk -t4 -c1000 -d10s --latency http://127.0.0.1:3000
Running 10s test @ http://127.0.0.1:3000
  4 threads and 1000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   404.88ms  266.96ms   2.00s    70.74%
    Req/Sec   542.41     98.39   828.00     72.25%
  Latency Distribution
     50%  426.95ms
     75%  559.05ms
     90%  722.69ms
     99%    1.21s 
  21618 requests in 10.06s, 1.98MB read
  Socket errors: connect 0, read 0, write 0, timeout 39
Requests/sec:   2148.20
Transfer/sec:    201.39KB

Not bad, but if we add println!("Something") inside or outside of tokio_threadpool::blocking block in the code i get benchmark with much lower latency and better Requests/sec:

wrk -t4 -c1000 -d10s --latency http://127.0.0.1:3000
Running 10s test @ http://127.0.0.1:3000
  4 threads and 1000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   268.47ms  175.47ms   2.00s    75.71%
    Req/Sec   820.25    217.47     1.33k    62.50%
  Latency Distribution
     50%  252.55ms
     75%  363.05ms
     90%  457.58ms
     99%  733.74ms
  32689 requests in 10.07s, 2.99MB read
  Socket errors: connect 0, read 0, write 0, timeout 40
Requests/sec:   3247.02
Transfer/sec:    304.41KB

from warp.

ubnt-intrepid avatar ubnt-intrepid commented on May 4, 2024

@goriunov The blocking API alone does not work as asynchronous computation as expected. You need to construct an instance of Future by using poll_fn as follows:

let db_filter = warp::path::index().and(pg).and_then(|db: PooledPg| {
     futures::future::poll_fn(move || {
          let result = futures::try_ready!(tokio_threadpool::blocking(|| { /* do some stuff */ }));
          result.map(Async::Ready).map_err(internal_server_error)
     })
})
.and_then(|_| Ok("Set data in DB"));

from warp.

goriunov avatar goriunov commented on May 4, 2024

@ubnt-intrepid Thank you for fixing my code, i am trying to implement it and it looks like that right now.
I have got stuck with result.map(Async::Ready).map_err(e) handling error part abit.

 let index_from_db = warp::path::index()
    .and(pg)
    .and_then(|db: PooledPg| {
      futures::future::poll_fn(move || {
        let result = futures::try_ready!(tokio_threadpool::blocking(|| {
          let data = diesel::insert_into(posts::table)
            .values(&NewPost {
              title: String::from("Hello world"),
              body: String::from("Hope it works"),
            })
            .execute(&db);

          Ok(Async::Ready)
        }));
         // need some help to handle error always get compile error :(
        result.map(Async::Ready).map_err(e)
      })
    })
    .and_then(|_| Ok("Set data in DB"));

I can se u use internal_server_error would you mind to show how to implement that part to, Thank you very much :)

from warp.

dakom avatar dakom commented on May 4, 2024

@ThePianoDentist - thanks for the update :)

If I understand correctly - this will still hold the DB connection across all the await points in the pipeline? e.g. if the next part is an async handler that, as part of its processing, sends off other reqwests or does async-style pausing stuff, the db connection will not be dropped and returned to the pool until the entire pipeline completes?

Of course that might be desirable - having an abstract way to get a connection and use it for a request's lifetime is probably the right thing to do most of the time - but just worth noting if the warp handler itself expects to only need the db part of the time, might be better to pass the PgPool clone all the way down and then only get() as needed within locally scoped blocks?

I'm not entirely sure I have a clear picture of how all this works - so consider this more a question for clarification than anything else :) Thanks!

from warp.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.