rxjava3-pool
Reactive object pool for use with RxJava 3.x. A core artifact for rxjava3-jdbc.
A pool is initialized with n
Member objects where n
is the maximum size of the pool.
Each Member object is either empty or holds an initialized value (like a database connection for example).
The state diagram for a Member is below. The states in green have entry procedures that can be run on user-specified schedulers (see example of creating a NonBlockingPool
below).
A Pool
conforms to this interface:
public interface Pool<T> extends AutoCloseable {
Single<Member<T>> member();
}
A Member
is an interface like this (slightly simplified):
public interface Member<T> {
T value();
/**
* This method should not throw. Feel free to add logging so that you are aware
* of a problem with disposal.
*/
void disposeValue();
void checkin();
}
This library provides one implementation of Pool
being NonBlockingPool
. Here's an example to create one:
Pool<Connnection> pool =
NonBlockingPool
.factory(() -> DriverManager.getConnection(url))
.checkinDecorator(checkInDecorator)
.idleTimeBeforeHealthCheck(30, TimeUnit.SECONDS)
.maxIdleTime(300, TimeUnit.SECONDS)
.createRetryInterval(10, TimeUnit.SECONDS)
.scheduler(Schedulers.io())
.disposer(c -> {try { c.close();} catch (Throwable e) {log.warn(e.getMessage(),e);}})
.healthCheck(c -> true)
.scheduler(Schedulers.io())
.maxSize(10)
.build();
The Single<Member>
returned by Pool.member()
can be subscribed to as many times as you like, concurrently if desired. The subscriber will be emitted to with a Member
that has a value and when the subscriber has finished should call Member.checkin()
to return the item to the pool.
Note that the dispose action should not throw, nor should the checker action. The initializing action may throw and if it does will be subject to retries on user-specified interval.