Common R2DBC Resources
This repository contains common resources for R2DBC projects. These are in particular:
Connection Pooling for Reactive Relational Database Connectivity
Home Page: https://r2dbc.io
License: Apache License 2.0
Surefire contains an invalid test name pattern hence tests aren't executed during the build.
PoolMetricsRecorder
removed Clock
from its interface.
We should add a meaningful toString()
method to io.r2dbc.pool.ConnectionPool
to expose useful debug information such as the driver name.
We should migrate away from releaseHandler
performing validation to the actual acquiry phase.
If a connection validation fails, we can simply invalidate the PooledRef
. Along with this behavior, we should have at least one retry (attempts should be configurable) to ensure smooth operations if a connection is in the pool that is broken.
When I try to create a new connection pool which should hold at most 5 connections I get an exception because the default initial size is 10.
This is an example that triggers the error:
final ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(DRIVER, "pool")
.option(PROTOCOL, "postgresql")
.option(HOST, "host")
.option(DATABASE, "database")
.option(USER, "user")
.option(PASSWORD, "password")
.option(Option.valueOf("maxSize"), 5)
.build();
ConnectionFactories.get(options);
And this is the stacktrace:
Exception in thread "main" java.lang.IllegalArgumentException: min must be less than or equal to max
at reactor.pool.AllocationStrategies$SizeBasedAllocationStrategy.<init>(AllocationStrategies.java:83)
at reactor.pool.PoolBuilder.sizeBetween(PoolBuilder.java:270)
at io.r2dbc.pool.ConnectionPool.createConnectionPool(ConnectionPool.java:139)
at io.r2dbc.pool.ConnectionPool.<init>(ConnectionPool.java:75)
at io.r2dbc.pool.PoolingConnectionFactoryProvider.create(PoolingConnectionFactoryProvider.java:74)
at io.r2dbc.pool.PoolingConnectionFactoryProvider.create(PoolingConnectionFactoryProvider.java:37)
at io.r2dbc.spi.ConnectionFactories.find(ConnectionFactories.java:44)
at io.r2dbc.spi.ConnectionFactories.get(ConnectionFactories.java:74)
From what I have seen it is not possible to configure the initial size using the ConnectionFactoryOptions
but only using the ConnectionPoolConfiguration.Builder
. Is there any reason why it should not be configurable via ConnectionFactoryOptions
? Independently of this question: Shouldn't the initial size be tied to the maximum size? For example something like initialSize = maxSize < 10 ? maxSize : 10
.
According to reactor/reactor-pool#34, I read there are 3 timeout:
ConnectionFactory
).PoolBuilder.from(allocator.timeout())
pool.acquire(Duration)
(the new method)pool.acquire().timeout()
@simonbasle is this correct understanding?
While thinking this, I'm thinking what would be good for connection pooling.
Currently in r2dbc-pool, timeout for 2 and 3 are configureable. (maxCreateConnectionTime
is for 2 and maxAcquireTime
for 3)
From connection-pool perspective, I think it is sufficient for making 2 and 3 configurable.
Q1: But is it too much configurable? Should we only make 3 configureable? OR, it's less, make all 1,2,3 configureable?
Q2: Validation. Currently, a user can set timeout for 3 to 10sec and 2 to 30sec.
Should we put validation logic that timeout for 2 should be less than equal to timeout for 3?
My opinion is timeout for 2 and 3 are sufficient. Maybe good to have warning message for validation.
Wondering what others think :)
I'm not able to open more than 10 connections with spring-webflux and r2dbc (with r2dbc-pool driver 0.8.0.M8). My config looks like:
@Configuration
public class PostgresConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "pool")
.option(PROTOCOL, "postgresql")
.option(HOST, host)
.option(USER, user)
.option(PASSWORD, password)
.option(DATABASE, database)
.build());
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMinutes(30))
.initialSize(initialSize)
.maxSize(maxSize)
.maxCreateConnectionTime(Duration.ofSeconds(10))
.build();
return new ConnectionPool(configuration);
}
}
If initialSize is 10, application started but the number of connection remains 10. If I specify initialSize=20 and timeout is 10 seconds, then the exception occurs:
org.springframework.dao.DataAccessResourceFailureException:
Failed to obtain R2DBC Connection; nested exception is
java.util.concurrent.TimeoutException:
Did not observe any item or terminal signal within 1000ms in 'lift'
(and no fallback has been configured)
at org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils
.lambda$getConnection$0(ConnectionFactoryUtils.java:71)
And timeout is still 1000.
P.S. max number of connections from the postgres server is 1000.
P.S.S. Typical usage looks like:
return databaseClient.select()
.from("foo_table")
.matching(where("id").is(id))
.as(Foo.class).fetch().one()
.switchIfEmpty(Mono.error(new FooException()));
The duplicated issue on SO just in case if it's some Spring issue not related to r2dbc-pool:
https://stackoverflow.com/questions/57971278/connection-pool-size-with-postgres-r2dbc-pool
For monitoring purpose, I think these information needs to be retrievable:
ConnectionPoolConfiguration
Method to evict(invalidate/purge) connections currently in pool.
In-use connections need to be invalidated after it is released. Or user should be able to define the behavior by passing parameter or provide separate methods for different behavior.
<! My application shows lots of idle connections i am using R2DC postgres with R2DC pool. I am using spring web flux with kotlin suport -->
Relevant Maven dependency mentioned below:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.9</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-r2dbc</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version>0.8.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>0.8.0.RELEASE</version>
</dependency>
Below is how i am creating a database client:
@Bean
@Primary
@Qualifier("CONNECTION_FACTORY_RR")
fun pool(): ConnectionFactory {
log.info("[DB_POOL][INIT_SIZE: $initialPoolSize][MAX_SIZE: $maxPoolSize]")
val config = PostgresqlConnectionConfiguration.builder()
.host(dbhost)
.port(dbport)
.database(database)
.username(username)
.password(password)
.build()
val factory = PostgresqlConnectionFactory(config)
val poolConfig = ConnectionPoolConfiguration.builder(factory)
poolConfig.initialSize(initialPoolSize)
poolConfig.maxSize(maxPoolSize)
poolConfig.maxIdleTime(Duration.ofSeconds(IDLE_TIME_OUT))
poolConfig.validationQuery("SELECT 1")
return ConnectionPool(poolConfig.build())
}
@Bean
fun databaseClient(connectionFactory: ConnectionFactory): DatabaseClient {
val dataAccessStrategy = DefaultReactiveDataAccessStrategy(PostgresDialect())
return DatabaseClient.builder()
.connectionFactory(connectionFactory)
.dataAccessStrategy(dataAccessStrategy)
.build()
}
**A sample query is mentioned below:**
fun getAnswerFromId(answerId: String): Mono<TopicAnswer> {
return databaseClient.execute(
"""
SELECT
*
FROM
answers
WHERE
id = 123456
""".trimIndent()
).asType<TopicAnswer>()
.fetch()
.one()
}
What's wrong in this why are so many db connection remaining in idle state.
I`m in confusing situation.
After connection timeout on DB server, connection in r2dbc-pool is not closed and new one is not created. So pool become completely exhausted (but dead connections still in pool) after all pooled connections timeout on DB server.
Java 11
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(pool.maxIdleTime)
.initialSize(pool.initialSize)
.maxSize(pool.maxSize)
.maxCreateConnectionTime(pool.maxCreateConnectionTime)
.maxAcquireTime(pool.maxAcquireTime)
.build();
spring.r2dbc.connection-timeout=3s
spring.r2dbc.pool.initial-size=10
spring.r2dbc.pool.max-size=100
spring.r2dbc.pool.max-idle-time=2m
spring.r2dbc.pool.max-create-connection-time=3s
spring.r2dbc.pool.max-acquire-time=5s
Hi,
In class https://github.com/r2dbc/r2dbc-pool/blob/master/src/main/java/io/r2dbc/pool/PooledConnection.java, we can see that in close() method, if auto-commit is false, the transaction is always rollbacked before releasing the connection back to the pool. Because of this, if I write
connection.commit().then(connection.close()), it throws an exception.
Methods to suspend/resume the pool.
Behavior needs to be defined when a connection is requested while pool is suspended.(wait(block) or error)
When a new connection can't be opened because maximum number of connections has been aquired, it would be nice if it was possible to configure the pool to wait for a connection to be freed up instead of failing with an exception.
Hi all,
I'm using r2dbc pool as part of a Spring WebFlux project using Spring Cloud hot-refresh capabilities based on consul watches.
In this context, my R2DBC PoolableConnection can get re-created with different settings which leads to a new ConnectionPool (SimpleFifoPool in my case) to block in SimplePool:
SimplePool(DefaultPoolConfig<POOLABLE> poolConfig) {
// ...
POOLABLE poolable = Objects.requireNonNull(poolConfig.allocator.block(), "allocator returned null in constructor");
// ...
}
Since this is executed in a reactive context, this lead to:
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-4
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.6.RELEASE.jar!/:3.2.6.RELEASE]
at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.6.RELEASE.jar!/:3.2.6.RELEASE]
at reactor.pool.SimplePool.<init>(SimplePool.java:72) ~[reactor-pool-0.0.1.M1.jar!/:0.0.1.M1]
at reactor.pool.SimpleFifoPool.<init>(SimpleFifoPool.java:41) ~[reactor-pool-0.0.1.M1.jar!/:0.0.1.M1]
at reactor.pool.PoolBuilder.build(PoolBuilder.java:296) ~[reactor-pool-0.0.1.M1.jar!/:0.0.1.M1]
at io.r2dbc.pool.ConnectionPool.createConnectionPool(ConnectionPool.java:118) ~[r2dbc-pool-0.8.0.M8.jar!/:na]
Could you guys provide me with any idea to handle this nicely ?
Hi, see error below. This happened after upgrading to latest milestone M5 of Spring Boot.
An attempt was made to call a method that does not exist. The attempt was made from the following location:
io.r2dbc.pool.ConnectionPool.createConnectionPool(ConnectionPool.java:102)
The following method did not exist:
reactor.pool.PoolBuilder.initialSize(I)Lreactor/pool/PoolBuilder;
I've got a problem with the method dispose()
. I'm newbie with R2DBC, Kotlin coroutines and reactive in general and it's probably a stupid error but I don't see it... Can you help?
class PoolTest {
lateinit var dbConnectionPool: ConnectionPool
lateinit var dbClient: R2dbc
@BeforeEach
fun beforeEach() {
val dbConnectionFactory = ConnectionFactories.get(
ConnectionFactoryOptions
.builder()
.option(Option.valueOf("driver"), "pool")
.option(Option.valueOf("protocol"), "postgresql")
.option(Option.valueOf("host"), "localhost")
.option(Option.valueOf("port"), 5432)
.option(Option.valueOf("database"), "xxx")
.option(Option.valueOf("user"), "xxx")
.option(Option.valueOf("password"), "xxx")
.build()
)
dbConnectionPool = ConnectionPool(
ConnectionPoolConfiguration
.builder(dbConnectionFactory)
.maxSize(10)
.build()
)
dbClient = R2dbc(dbConnectionPool)
}
@AfterEach
fun afterEach() {
dbConnectionPool.dispose()
}
@Test
fun test1() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 1").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test2() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 2").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test3() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 3").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test4() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 4").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test5() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 5").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test6() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 6").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test7() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 7").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test8() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 8").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test9() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 9").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test10() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 10").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
@Test
fun test11() = runBlocking {
val result = dbClient
.withHandle { it.select("SELECT 11").mapRow { row, _ -> row[0] as Int } }
.awaitFirst()
println(result)
}
}
The firsts 10 tests are OK. But the 11th test fails with error:
sorry, too many clients already
io.r2dbc.postgresql.ExceptionFactory$PostgresqlNonTransientResourceException: [53300] sorry, too many clients already
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:98)
at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:110)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:96)
at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:319)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:650)
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:728)
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.onNext(FluxWindowPredicate.java:770)
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onNext(FluxWindowPredicate.java:249)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:329)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:794)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:424)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:326)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
If I execute with psql the command \watch SELECT query FROM pg_stat_activity WHERE query != '';
in parallel of tests execution, I can see number of opened connections increase by 10 at each test. I test to add a Thread.sleep(5000)
after the dbConnectionPool.dispose()
but no change.
Versions:
Thanks!
Right now, a ConnectionPoolConfiguration.Builder
requires a ConnectionFactory
to create the builder instance. Requiring a ConnectionFactory
during configuration can be problematic as it already requires the actual resource.
We should allow builder creation without a ConnectionFactory
to that it can be supplied later by a connectionFactory(…)
builder method.
Case:
Mysql server 5.7.27, timeout = 180s
spring.r2dbc.connection-timeout=3s
spring.r2dbc.pool.initial-size=10
spring.r2dbc.pool.max-size=200
spring.r2dbc.pool.max-idle-time=10s
spring.r2dbc.pool.validation-query=SELECT 1
spring.r2dbc.pool.max-create-connection-time=3s
spring.r2dbc.pool.max-acquire-time=5s
spring.r2dbc.pool.max-life-time=180s
spring-data-r2dbc
1.0.0.RC1
r2dbc-pool
0.8.0.RC1
r2dbc-mysql
0.8.0.RC2
It would be better if someone uploads a diagram to understand the difference between going for traditional JDBC vs R2DBC with a diagram as below
https://ejbvn.files.wordpress.com/2008/11/95.jpg
this one is easy to understand the connection pool
it would be great if many diagrams are created for easy understanding of core concepts - pooling difference, blocking vs non-blocking I/O RDBMS etc.,
How to use your technology to link MySQL in spring??
We should retain the PoolMetricsWrapper
instance to prevent NaN gauge values.
The corresponding issue in the Spring Boot R2DBC project is: spring-attic/spring-boot-r2dbc#31
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>0.8.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version>0.8.0.BUILD-SNAPSHOT</version>
</dependency>
java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0
at reactor.core.publisher.FluxOnAssembly$OnAssemblyException.toTuple(FluxOnAssembly.java:220) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblyException.<init>(FluxOnAssembly.java:206) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.fail(FluxOnAssembly.java:410) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:354) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.Operators.error(Operators.java:182) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxError.subscribe(FluxError.java:43) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly.subscribe(FluxOnAssembly.java:106) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxCallableOnAssembly.subscribe(FluxCallableOnAssembly.java:51) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.Flux.subscribe(Flux.java:7800) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.onNext(FluxUsingWhen.java:201) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:349) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.pool.AbstractPool$Borrower.deliver(AbstractPool.java:341) ~[reactor-pool-0.0.1.BUILD-20190610.214858-38.jar:0.0.1.BUILD-SNAPSHOT]
at reactor.pool.SimplePool.lambda$drainLoop$6(SimplePool.java:204) ~[reactor-pool-0.0.1.BUILD-20190610.214858-38.jar:0.0.1.BUILD-SNAPSHOT]
at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:47) ~[reactor-core-3.3.0.M1.jar:3.3.0.M1]
at reactor.pool.SimplePool.drainLoop(SimplePool.java:204) ~[reactor-pool-0.0.1.BUILD-20190610.214858-38.jar:0.0.1.BUILD-SNAPSHOT]
at reactor.pool.SimplePool.drain(SimplePool.java:145) ~[reactor-pool-0.0.1.BUILD-20190610.214858-38.jar:0.0.1.BUILD-SNAPSHOT]
at reactor.pool.SimplePool.doAcquire(SimplePool.java:113) ~[reactor-pool-0.0.1.BUILD-20190610.214858-38.jar:0.0.1.BUILD-SNAPSHOT]
at reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:305) ~[reactor-pool-0.0.1.BUILD-20190610.214858-38.jar:0.0.1.BUILD-SNAPSHOT]
We are using r2dbc-pool
along with r2dbc-postgresql
and despite setting the maxSize
of the pool configuration, it looks like it's not respected.
Here is the config inheriting from AbstractR2dbcConfiguration
:
@Bean
override fun connectionFactory(): ConnectionFactory {
val connectionFactory = ConnectionFactories.get(builder()
.option(DRIVER, "postgresql")
.option(HOST, host)
.option(PORT, port)
.option(USER, username)
.option(PASSWORD, password)
.option(DATABASE, database)
.build())
val configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.initialSize(1)
.maxSize(1)
.registerJmx(false)
.build()
return ConnectionPool(configuration)
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.