Comments (23)
You may create a db pool, and get each connection from the pool per request like so:
use warp::{self, reject, Filter};
use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;
fn pg_pool() -> PgPool {
let manager = ConnectionManager::<PgConnection>::new("postgres://username:password@localhost/db_name");
Pool::new(manager).expect("Postgres connection pool could not be created")
}
fn main() {
let pool = pg_pool();
// setup the the connection pool to get a connection on each request
let pg = warp::any()
.map(move || pool.clone())
.and_then(|pool: PgPool| match pool.get() {
Ok(conn) => Ok(conn),
Err(_) => Err(reject::server_error()),
});
let hello_world = warp::get(
warp::index()
// use the pg connection on the route
.and(pg)
.map(|db: PooledPg| {
// use the conn
"Hello World!"
}),
);
warp::serve(hello_world).run(([127, 0, 0, 1], 3001));
}
from warp.
@seanmonstar Finchers provides a helper function for building a Future
from a function that may block the current thread. It seems to be good to introduce such a helper function also in warp, in order to avoid confusion like this thread.
from warp.
thank you all
I think its good way to put an example for diesel inside example folder
from warp.
@andy-rust You are copying the code as is. Replace // use the conn
part with your route logic.
I'll update the example with a dummy response.
from warp.
@csharad - thank you very much for the prompt answer! Now in the retrospective, I have to admit the question was stupid, sorry about that.
The only thing it had been swearing at me was "warning: use of deprecated item 'warp::get': warp::get2() is meant to replace get()"
With your permission I'd put your solution with the quick fix for anyone to just grab and run:
use warp::{Filter, reject};
use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;
fn pg_pool() -> PgPool {
let manager = ConnectionManager::<PgConnection>::new("postgres://username:password@localhost/db_name");
Pool::new(manager).expect("Postgres connection pool could not be created")
}
fn main() {
let pool = pg_pool();
let pg = warp::any()
.map(move || pool.clone())
.and_then(|pool: PgPool| match pool.get() {
Ok(conn) => Ok(conn),
Err(_) => Err(reject::server_error()),
});
let index_from_db = warp::path::index()
.and(pg)
.map(|db: PooledPg| {
"Get the data from DB"
});
let routes = warp::get2()
.and(index_from_db);
warp::serve(routes)
.run(([0, 0, 0, 0], 3030));
}
from warp.
Been trying this with warp 2.0
In examples above generating a connection just requires small modification of adding async move
inside and_then
let pg_conn = warp::any().map(move || pool.clone()).and_then(|pool: PgPool| async move{ match pool.get(){
Ok(conn) => Ok(conn),
Err(_) => Err(reject::custom(PgPoolError)),
}});
will edit with any other alterations
from warp.
@dakom apologies. Im in same boat as you.
Using this to test/learn async rust. Nothing you've written sounds incorrect to me, but I'm also noob!
from warp.
I've haven't used diesel yet, so if anyone else wants to help show how to use it here, that'd be awesome!
from warp.
I used Diesel very naively (it doesn't support connection pooling! I need to investigate) but it works.
Just write an etablish_connection
function
use diesel::pg::PgConnection;
use diesel::prelude::*;
use dotenv::dotenv;
use std::env;
pub fn establish_connection() -> PgConnection {
dotenv().ok();
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
PgConnection::establish(&database_url).expect(&format!("Error connecting to {}", database_url))
}
And used it like this for example:
pub fn retrieve_one_user(id_param: i32) -> Result<impl warp::Reply, warp::Rejection> {
use schema::users::dsl::*;
let connection = establish_connection();
let results = users
.select((id, email, username, created_at, updated_at))
.find(id_param)
.load::<PublicUserData>(&connection)
.optional()
.expect("Error loading user");
match results {
Some(json) => {
if json.is_empty() == true {
return Err(warp::reject::not_found());
}
Ok(warp::reply::json(&json))
}
None => Err(warp::reject::server_error()),
}
}
from warp.
thanks
Appreciated
from warp.
I believe in that case that uses a blocking connection to load from the database, so you'd probably want to move that to a separate thread (or consider wrapping it in tokio_threadpool::blocking
).
from warp.
unfortunately, the example of @csharad does not work for me :(
warp::serve(hello_world)
^^^^^^^^^^^ the trait `warp::reply::sealed::ReplySealed` is not implemented for `()`
from warp.
@andy-rust what if you need to share the pool between multiple routes?
from warp.
Sorry I meant to reply to @csharad :-)
from warp.
@stefanoc Use the pg
filter in other routes like:
let index_route = warp::path::index()
.and(pg.clone())
.map(|db: PooledPg| {
"Get the data from DB"
});
let login_route = path!("login")
.and(pg)
.map(|db: PooledPg| {
"Get the data from DB"
});
from warp.
From a quick glance on this code, I would argue that it is still running on the thread that warp is running on and since Diesel uses blocking operations, it will block warp's thread.
If I am correctly reading this, then all Diesel operations still must be moved to it's own Tokio thread pool as @seanmonstar mentioned above.
If I am wrong about the Diesel internals, please ignore my comment.
from warp.
The diesel operations work on its own thread pool, if r2d2
is used as far as I know. But still it is a blocking operation and does not interact with futures.
from warp.
@csharad you're moving pg
in the first route, so you can't use it in the second route.
from warp.
@stefanoc Ah! yes. It clonable not copyable. Will fix that.
from warp.
@csharad, @stefanoc, @Elod10, @seanmonstar, @carllerche
I have slightly modified the code of @andy-rust to show how to use it with tokio_threadpool::blocking
extern crate tokio_threadpool;
use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
use warp::{reject, Filter};
type PgPool = Pool<ConnectionManager<PgConnection>>;
type PooledPg = PooledConnection<ConnectionManager<PgConnection>>;
fn pg_pool() -> PgPool {
let manager =
ConnectionManager::<PgConnection>::new("postgres://username:password@localhost/db_name");
Pool::new(manager).expect("Postgres connection pool could not be created")
}
fn main() {
let pool = pg_pool();
let pg = warp::any()
.map(move || pool.clone())
.and_then(|pool: PgPool| match pool.get() {
Ok(conn) => Ok(conn),
Err(_) => Err(reject::server_error()),
});
let index_from_db = warp::path::index().and(pg).map(|db: PooledPg| {
tokio_threadpool::blocking(|| {
diesel::insert_into(posts::table)
.values(&NewPost {
title: String::from("Hello world"),
body: String::from("Hope it works"),
})
.execute(&db)
.unwrap();
})
.and_then(|_| Ok("Set data in DB"))
.unwrap()
});
let routes = warp::get2().and(index_from_db);
warp::serve(routes).run(([0, 0, 0, 0], 3030));
But tokio_threadpool::blocking
or diesel
or tokio
has some weird issue (at least on Arch linux) if i benchmark the code above i will get:
wrk -t4 -c1000 -d10s --latency http://127.0.0.1:3000
Running 10s test @ http://127.0.0.1:3000
4 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 404.88ms 266.96ms 2.00s 70.74%
Req/Sec 542.41 98.39 828.00 72.25%
Latency Distribution
50% 426.95ms
75% 559.05ms
90% 722.69ms
99% 1.21s
21618 requests in 10.06s, 1.98MB read
Socket errors: connect 0, read 0, write 0, timeout 39
Requests/sec: 2148.20
Transfer/sec: 201.39KB
Not bad, but if we add println!("Something")
inside or outside of tokio_threadpool::blocking
block in the code i get benchmark with much lower latency and better Requests/sec:
wrk -t4 -c1000 -d10s --latency http://127.0.0.1:3000
Running 10s test @ http://127.0.0.1:3000
4 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 268.47ms 175.47ms 2.00s 75.71%
Req/Sec 820.25 217.47 1.33k 62.50%
Latency Distribution
50% 252.55ms
75% 363.05ms
90% 457.58ms
99% 733.74ms
32689 requests in 10.07s, 2.99MB read
Socket errors: connect 0, read 0, write 0, timeout 40
Requests/sec: 3247.02
Transfer/sec: 304.41KB
from warp.
@goriunov The blocking API alone does not work as asynchronous computation as expected. You need to construct an instance of Future
by using poll_fn
as follows:
let db_filter = warp::path::index().and(pg).and_then(|db: PooledPg| {
futures::future::poll_fn(move || {
let result = futures::try_ready!(tokio_threadpool::blocking(|| { /* do some stuff */ }));
result.map(Async::Ready).map_err(internal_server_error)
})
})
.and_then(|_| Ok("Set data in DB"));
from warp.
@ubnt-intrepid Thank you for fixing my code, i am trying to implement it and it looks like that right now.
I have got stuck with result.map(Async::Ready).map_err(e)
handling error part abit.
let index_from_db = warp::path::index()
.and(pg)
.and_then(|db: PooledPg| {
futures::future::poll_fn(move || {
let result = futures::try_ready!(tokio_threadpool::blocking(|| {
let data = diesel::insert_into(posts::table)
.values(&NewPost {
title: String::from("Hello world"),
body: String::from("Hope it works"),
})
.execute(&db);
Ok(Async::Ready)
}));
// need some help to handle error always get compile error :(
result.map(Async::Ready).map_err(e)
})
})
.and_then(|_| Ok("Set data in DB"));
I can se u use internal_server_error
would you mind to show how to implement that part to, Thank you very much :)
from warp.
@ThePianoDentist - thanks for the update :)
If I understand correctly - this will still hold the DB connection across all the await points in the pipeline? e.g. if the next part is an async handler that, as part of its processing, sends off other reqwests or does async-style pausing stuff, the db connection will not be dropped and returned to the pool until the entire pipeline completes?
Of course that might be desirable - having an abstract way to get a connection and use it for a request's lifetime is probably the right thing to do most of the time - but just worth noting if the warp handler itself expects to only need the db part of the time, might be better to pass the PgPool clone all the way down and then only get() as needed within locally scoped blocks?
I'm not entirely sure I have a clear picture of how all this works - so consider this more a question for clarification than anything else :) Thanks!
from warp.
Related Issues (20)
- Feature request: More customization points in tracing HOT 2
- EC Private key support HOT 1
- Make `Option<F>` a filter when F is a filter
- Feature request: `warp::make_service()` or `warp::service_with_addr()`
- CVE-2023-43669/GHSA-9mcr-873m-xcxp: tungstenite <= 0.20.0 DoS vulnerability HOT 1
- [feature request] [low prio] Non-Metal fallback option HOT 2
- SSE gives up on Streams that return Pending HOT 3
- Server::run should return ! (never type)
- Default OS / self signed certificate without create it HOT 1
- websocket disconnect (code: 1006, reason: "") HOT 2
- Add `rust-version` into `Cargo.toml` HOT 1
- Error `connection closed before message completed` after 60s HOT 2
- Websocket connection closes immediately, but not with firefox HOT 4
- Navigation between panes using vim commands HOT 1
- examples/tls/cert.pem has expired
- Upgrade to hyper v1 HOT 4
- key contains no private key while using ecc key HOT 1
- integrate with monoio
- HELP: filters inside .then never get executed HOT 1
- Chinese input method following error in notebook HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from warp.