Coder Social home page Coder Social logo

reactor-pool's Introduction

Reactor-Pool

Join the chat at https://gitter.im/reactor/reactor CircleCI Code Coverage

The reactor-pool project aims at providing a generic object pool to reactive application that:

  • exposes a reactive API (Publisher input types, Mono return types)
  • is non-blocking (never blocking a user that makes an attempt to acquire() a resource)
  • has lazy acquire behavior

For use-cases where granular control of the release() is needed, the classic path of acquire() is offered, which exposes a PooledRef wrapper to the resource. This also allows access to statistics about the resource lifecycle in the pool.

// given Pool<T> pool
Mono<PooledRef<T>> grabResource = pool.acquire();
//no resource is actually requested yet at this point

grabResouce.subscribe();
//now one resource is requested from the pool asynchronously

//Another example, this time synchronously acquiring, immediately followed up by a release:
PooledRef<T> ref = grabResource.block(); //second subscription requests a second resource
ref.release().block(); //release() is also asynchronous and lazy

For use-cases where the resource itself can be consumed reactively (exposes a reactive API), a scoped mode of acquisition is offered. withPoolable:

  • let the consumer declaratively use the resource
  • provides a scope / closure in which the resource is acquired, used as instructed and released automatically
  • avoids dealing with an indirection (the resource is directly exposed)
//given at DbConnection type and a Pool<DbConnection> pool
pool.withPoolable(resource -> resource
    //we declare using the connection to create a Statement...
    .createStatement()
    //...then performing a SELECT query...
    .flatMapMany(st -> st.query("SELECT * FROM foo"))
    //...then marshalling the rows to JSON
    .map(row -> rowToJson(row))
    //(all of which need the live resource)
)
//at this point the rest of the steps are outside the scope
//so the resource can be released
.map(json -> sanitize(json));

Licensed under Apache Software License 2.0

reactor-pool's People

Contributors

chemicl avatar deejay1 avatar dependabot[bot] avatar ericbottard avatar freelon avatar k-tokarev avatar olegdokuka avatar pderop avatar simonbasle avatar smaldini avatar spring-builds avatar ttddyy avatar violetagg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactor-pool's Issues

SimpleDequePool not getting drained

I have a Spring Boot project which uses reactor-netty for WebClient which is dependent on reactor-pool. I'm using WebClient to make API calls. I'm facing an issue wherein Webclient is not able to create new connections for the incoming requests. All requests for connections from the Pool are added to the Pending Queue and soon enough the Pending Queue becomes full (10000 is the max-queue-size in our case) after which all requests fail with a PoolAcquirePendingLimitException

Graph for the Pool Metrics:
WebclientMetrics

Expected Behavior

Queue draining should never be failing in any condition

Actual Behavior

The Queue is not getting drained and remains full

Steps to Reproduce

Unfortunately, I haven't been able to reproduce this on my local machine. I've only been able to witness this on the production environment randomly in 10% of the running instances
If this helps in any manner, just before this issue occurring, a huge number of ConnectionExceptions occur

Possible Problem

After some investigation, I found something that might be possibly causing this issue
Please note that I wasn't able to reproduce the scenario I've written below

From what I understand from the source code for SimpleDequePool.java, new requests called borrowers are added to the pending queue and next a call to drain() the pending queue is made
Before actually draining (drainLoop()) the queue, WIP is incremented.
drainLoop() is responsible for handling the borrower as well as other borrowers which may have just incremented WIP while checking the (WIP.getAndIncrement(this) == 0) condition and then decrementing WIP for itself as well the other borrowers.
This is because for a single drainLoop() call, we don't break out of the infinite loop, until and unless WIP.decrementAndGet(this) == 0

If for whatever reason, a call to drainLoop() fails to handle decrementing WIP, the further requests to drain() will never pass the below if condition, and the borrowers will just be kept adding to the pending queue without ever getting handled

void drain() {
    if (WIP.getAndIncrement(this) == 0) {
        drainLoop();
    }
}

The fact that the metrics show pending connections = 10000, and no active connections makes me believe this is the condition causing the issue

Environment

  • Reactor version(s) used:
  1. reactor-netty: 0.9.12.RELEASE
  2. reactor-pool: 0.1.6.RELEASE
  • Spring Boot: 2.2.10.RELEASE
  • JVM version: openjdk version "1.8.0_181"
  • OS and version: alpine:3.6

Add a `PriorityQueue`-for-idle-resources variant

Motivation

An extension of #87 where other criteria than Least|Most-Recently-Used would be possible for polling order of idle resources (ie. created oldest).

The Priority Queue opens up more usages since a Comparator<PooledRef> can be used. For instance, one could prioritize youngest/oldest resources (in terms of creation time, independently of when it was last released) or by number of acquisitions. As long as the "priority" is not changing after insertion.

This implies:

  • Adding accessors for the PooledRef release timestamp (and creation timestamp): as "age" is a function of time and so is too dynamic for a priority => a timestamp is more absolute
  • Possible performance loss: the PriorityQueue interface relying on Comparator, implementations may be less efficient than simple baked-in fifo/lifo in Deque implementations

At the same time, it would better support maintaining order when polling-and-reinserting, which could help recover from some CAS failures.

Desired solution

An efficient Priority Queue implementation would be necessary, but the JDK only provides a binary-heap implementation that is not thread-safe (PriorityQueue) or a thread safe version that relies heavily on blocking/locks (PriorityBlockingQueue).

Considered alternatives

Implement one of the papers below.

Additional context

To our knowledge, no efficient lock-free implementation of Priority Queue exist in Java.
For reference, here are a few recent papers on concurrent priority queues:

Support MRU poll order for idle resources

Motivation

There is currently two flavors of pools, fifo and lifo, but these attributes apply to the order in which pending acquire() are served once a resource becomes available (ie no resource is available and no resource can be created, but several acquire() are attempted).

The reverse situation where resources are idle in the pool when a single acquire() comes in is not impacted by this parameter. Instead, provided there is no pending acquires, the pool always produces idle resources in "fifo" insertion order (effectively equivalent to a Least-Recently-Used order)

Desired solution

Add a way to tune the order in which idle resources are chosen when a new acquire() comes in. At a minimum, the following use case should be covered:

When dealing with a TCP Connection, some remote servers may close the connection if it has been inactive for too long. To avoid even receiving this kind of errors entirely, using a MRU (Most-Recently-Used) order when polling for an idle resource could be beneficial.

This should be achievable by either using a Deque (on which we would either call pollFirst() or pollLast() or some form of PriorityQueue (see #88).

Moving aways from the Reactor MPSC queue also enables iteration, a pre-requisite for background eviction (#86)

Challenges

  • The current implementation uses a lock-free MPSC unbounded queue. The alternative must be efficient enough to stand the comparison with that structure.
  • Under low demand, a MRU scheme would mean old connections have less chances to be visited, which also means that they will linger in the queue and won't get evicted. This makes #86 (background eviction) even more important to have, since this increases the symptoms of low pool utilization.

"evictionidle()" and "evictionPredicate()" on PoolBuilder cannot be used together

This is because evictionidle() internally set evictionPredicate() with idlePredicate; thus, previously set or setting another predicate to evictionPredicate() will override the existing one.

This can be worked around in caller side to create a composite predicate for doing idle time check and another whatever predicate logic.

I am wondering is it better to have such composite predicate in pool builder side, and instead of overriding predicate by evictionidle() and evictionPredicate(), simply adding them to the composite predicate and chain them as "OR" semantics?

Race condition when releasing at the same time the pool is disposed

Expected Behavior

Releasing a resource to a pool that is being disposed should result in the resource being properly destroyed, as per destroyHandler.

Actual Behavior

Sometimes the resource is not destroyed.

Steps to Reproduce

	@ParameterizedTest
	@MethodSource("allPools")
	void releaseRacingWithPoolClose(Function<PoolBuilder<AtomicInteger, ?>, AbstractPool<AtomicInteger>> configAdjuster)
			throws InterruptedException {
		for (int i = 0; i < 10_000; i++) {
			final int round = i + 1;
			ConcurrentLinkedQueue<AtomicInteger> created = new ConcurrentLinkedQueue<>();
			PoolBuilder<AtomicInteger, PoolConfig<AtomicInteger>> builder =
					PoolBuilder.from(Mono.fromCallable(() -> {
						AtomicInteger resource = new AtomicInteger(round);
						created.add(resource);
						return resource;
					}))
					           .evictionPredicate((obj, meta) -> false)
					           .destroyHandler(ai -> Mono.fromRunnable(() -> ai.set(-1)))
					           .sizeBetween(0, 4);

			InstrumentedPool<AtomicInteger> pool = configAdjuster.apply(builder);

			PooledRef<AtomicInteger> ref = pool.acquire().block();

			final CountDownLatch latch = new CountDownLatch(2);
			//acquire-and-release, vs pool disposal
			RaceTestUtils.race(
					() -> ref.release().doFinally(__ -> latch.countDown()).subscribe(),
					() -> pool.disposeLater().doFinally(__ -> latch.countDown()).subscribe()
			);

			assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch counted down").isTrue();
			assertThat(pool.isDisposed()).as("pool isDisposed").isTrue();
			assertThat(pool.metrics().idleSize()).as("pool has no idle elements").isZero();

			assertThat(created).allSatisfy(ai -> assertThat(ai).hasValue(-1));
		}
	}

	@ParameterizedTest
	@MethodSource("allPools")
	void poolCloseRacingWithRelease(Function<PoolBuilder<AtomicInteger, ?>, AbstractPool<AtomicInteger>> configAdjuster)
			throws InterruptedException {
		for (int i = 0; i < 10_000; i++) {
			final int round = i + 1;
			ConcurrentLinkedQueue<AtomicInteger> created = new ConcurrentLinkedQueue<>();
			PoolBuilder<AtomicInteger, PoolConfig<AtomicInteger>> builder =
					PoolBuilder.from(Mono.fromCallable(() -> {
						AtomicInteger resource = new AtomicInteger(round);
						created.add(resource);
						return resource;
					}))
					           .evictionPredicate((obj, meta) -> false)
					           .destroyHandler(ai -> Mono.fromRunnable(() -> ai.set(-1)))
					           .sizeBetween(0, 4);

			InstrumentedPool<AtomicInteger> pool = configAdjuster.apply(builder);

			PooledRef<AtomicInteger> ref = pool.acquire().block();

			final CountDownLatch latch = new CountDownLatch(2);
			//acquire-and-release, vs pool disposal
			RaceTestUtils.race(
					() -> pool.disposeLater().doFinally(__ -> latch.countDown()).subscribe(),
					() -> ref.release().doFinally(__ -> latch.countDown()).subscribe()
			);

			assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch counted down").isTrue();
			assertThat(pool.isDisposed()).as("pool isDisposed").isTrue();
			assertThat(pool.metrics().idleSize()).as("pool has no idle elements").isZero();
			assertThat(created).allSatisfy(ai -> assertThat(ai).hasValue(-1));
		}
	}

Switch to Github Actions for CI

See reactor/reactor#690 for context.

This includes both PR checks and push checks on maintenance branches.
We'll also need to remove travis.yml file, and do the following repo config changes:

  • remove travis app integration
  • change the "required checks" to include the new ci jobs

Document how to emulate async evictionPredicate

We could need something to check on acquisition if a resource is healthy (and if not lead to the eviction/destroy lifecycle).

For instance in reactor-netty we check if the connection is still alive before passing it.

Propagate Context from acquire to the resource creation

Motivation

Reactor Netty needs to obtain the information stored in the Context (when acquire is invoked) while creating the resource, which in Reactor Netty use case is establishing a connection to the remote peer.
This feature request is triggered by reactor/reactor-netty#1327

Desired solution

The Context information provided when acquire is invoked to be available when establishing a connection to the remote peer.

Make AllocationStrategy contract able to return *more* permits than requested, for a minimum size

Use case is a floor for number of resources despite a getPermits(n).

First acquire would trigger the AllocationStrategy to give minSize permits, effectively instructing the pool to create 1 resource for the current acquire + minSize - 1 resources to be "set aside".

Some implementations might consider that a "bad idea" (eg. such resources would be tied to the current thread, where we'd prefer late binding), in which case the implementation could elect to returnPermits(effectivePermits - n).

Concurrent acquires can lead to missed acquisitions in SimplePool

Starting with the Dysprosium-M3 release, when executing multiple http requests in parallel, the PooledConnectionProvider is only utilizing 2 connections, therefore not allowing sufficient parallelism.

Prior to Dysprosium-M3, the PooledConnectionProvider properly utilized up to its max connections.

So, this is a regression.

Expected Behavior

Executing multiple http requests in parallel, I expect the PooledConnectionProvider to allow up to its maxConnections to be created and utilized.

Actual Behavior

PooledConnectionProvider is only creating and utilizing 2 connections

Steps to Reproduce

Example project with failing unit test attached.

The unit test executes parallel requests with the following connection providers:

  • ConnectionProvider.newConnection(),
  • ConnectionProvider.elastic("elastic"),
  • ConnectionProvider.fixed("fixed", 100)

Only the ConnectionProvider.newConnection() works with >= Dysprosium-M3. The other two fail.

All tests work with <= Dysprosium-M2

Possible Solution

none

Your Environment

  • Reactor version(s) used:
    • Fails with >= Dysprosium-M3 (Tried with Dysprosium-M3 and Dysprosium-RC1)
    • Works with <= Dysprosium-M2
  • JVM version (javar -version):
    • 11.0.4
  • OS and version (eg uname -a):
    • Windows 10

Generalize accepted Publisher types

From the exercise building r2dbc-pool, I have some feedback to share regarding the API. In general, it is perfectly usable and feels consistent. Some of the methods read strange (sizeMax vs maxSize), but that's just cosmetics.

I found three items that I'd suggest to streamline (high-level things, nothing serious I guess):

  1. Broaden PoolBuilder.from(Mono<T>) to PoolBuilder.from(Mono<? extends T>) to avoid downcasting hassle if the Mono returns a subtype and we want to use a super-typed pool.
  2. Accept Publisher in PoolBuilder.from() to avoid repeating Mono.from(…) calls.
  3. Accept Publisher in destroyHandler(…)/releaseHandler(…) functions.

From an R2DBC SPI perspective, where all API is Publisher based, the suggestions allow for streamlining the code that is written without adding casts or conversion to Mono first.

Added the feedback here as I felt that is the right group with the appropriate visibility instead of dumping it into Slack.

Allow pluggable strategy to chose the next pending acquire to serve

This concern is orthogonal to thread affinity. Currently QueuePool has a basic FIFO implementation and AffinityPool uses FIFO for a given SubPool, as well as a slowpath that might be totally unfair (as it always loops through the subpools in the Map's order).

A LIFO option would probably make sense for some use cases.

items returned to the pool multiple times

I originally reported this as r2dbc/r2dbc-pool#92, but was recommended to take it here. (the following test is an abbreviated from of what's there)

Initially observed on R2DBC Arabba-SR6, still present on Arabba-SR7.

  • target is PostgreSQL 12 (AWS RDS).
  • openjdk version "11.0.7.0.101" 2020-07-14 LTS (Zulu)

A given PostgresqlConnection is being provided to multiple pool users simultaneously.

With Arabba-SR7, I started seeing the following stack trace (there was no exception from the pool thrown with SR6

java.lang.IllegalArgumentException: Too many permits returned: returned=1, would bring to 11/10
  at reactor.pool.AllocationStrategies$SizeBasedAllocationStrategy.returnPermits(AllocationStrategies.java:141)
  at reactor.pool.AbstractPool.destroyPoolable(AbstractPool.java:147)
  at reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:310)
  at reactor.pool.SimpleDequePool.drain(SimpleDequePool.java:204)
  at reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:199)
  at reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:378)
  at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:130)

I have also added logging around the acquisition and close of connections, both on over and under the pool. My logging shows this:

2020-10-01 22:23:03.844 DEBUG 852 --- [tor-tcp-epoll-1] c.n.b.e.p.LoggingConnectionFactory       : created CloseLoggingConnection{name='query/reader (over pool) 41', delegate=PooledConnection[CloseLoggingConnection{name='query/reader (under pool) 4', delegate=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@261617c5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@4cb7990f}}]}
2020-10-01 22:23:03.851 DEBUG 852 --- [tor-tcp-epoll-1] c.n.b.e.p.LoggingConnectionFactory       : closed CloseLoggingConnection{name='query/reader (over pool) 41', delegate=PooledConnection[CloseLoggingConnection{name='query/reader (under pool) 4', delegate=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@261617c5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@4cb7990f}}]} (onComplete)
2020-10-01 22:23:17.116 DEBUG 852 --- [tor-tcp-epoll-1] c.n.b.e.p.LoggingConnectionFactory       : created CloseLoggingConnection{name='query/reader (over pool) 51', delegate=PooledConnection[CloseLoggingConnection{name='query/reader (under pool) 4', delegate=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@261617c5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@4cb7990f}}]}
2020-10-01 22:23:17.166 DEBUG 852 --- [tor-tcp-epoll-1] c.n.b.e.p.LoggingConnectionFactory       : closed CloseLoggingConnection{name='query/reader (over pool) 51', delegate=PooledConnection[CloseLoggingConnection{name='query/reader (under pool) 4', delegate=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@261617c5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@4cb7990f}}]} (onComplete)
2020-10-01 22:23:17.179 DEBUG 852 --- [tor-tcp-epoll-1] c.n.b.e.p.LoggingConnectionFactory       : created CloseLoggingConnection{name='query/reader (over pool) 62', delegate=PooledConnection[CloseLoggingConnection{name='query/reader (under pool) 4', delegate=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@261617c5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@4cb7990f}}]}
2020-10-01 22:23:17.225 DEBUG 852 --- [tor-tcp-epoll-1] c.n.b.e.p.LoggingConnectionFactory       : created CloseLoggingConnection{name='query/reader (over pool) 70', delegate=PooledConnection[CloseLoggingConnection{name='query/reader (under pool) 4', delegate=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@261617c5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@4cb7990f}}]}
2020-10-01 22:23:19.297 DEBUG 852 --- [tor-tcp-epoll-1] c.n.b.e.p.LoggingConnectionFactory       : closed CloseLoggingConnection{name='query/reader (over pool) 62', delegate=PooledConnection[CloseLoggingConnection{name='query/reader (under pool) 4', delegate=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@261617c5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@4cb7990f}}]} (onComplete)
2020-10-01 22:23:19.302 DEBUG 852 --- [tor-tcp-epoll-1] c.n.b.e.p.LoggingConnectionFactory       : closed CloseLoggingConnection{name='query/reader (over pool) 70', delegate=PooledConnection[CloseLoggingConnection{name='query/reader (under pool) 4', delegate=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@261617c5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@4cb7990f}}]} (onComplete)

under pool 4 is the database connection over pool 41 and over pool 51 both acquire and release it as expected however, then over pool 62 and over pool 70 are provided the same underlying database connection simultaneously.

I am unable to reproduce this in my test suite; this does consistently happen when running my application in a deployed environment.

There doesn't seem to be any additional logging I can enable in either r2dbc-pool or reactor-pool to help debug. I've temporarily disabled r2dbc-pool in my application, and several observed "weird problems" have now gone away.

I'm open on how to debug this further.

Simplify package hierarchy and dependencies further

  • metrics package can be removed (and dependency to HdrHistogram in main sourceset)
    • NoOpsMetricsRecorder can be made private inner class of the PoolBuilder
    • InMemoryPoolMetrics can be made a test class
  • impl package can be fused with pool root package

Split metadata out of `PooledRef`

PooledRef should only directly expose the methods that are most relevant to the majority of users: release(), invalidate() (to a lesser extent, but that's the logical place to put it) and poolable().

Additional methods that return metrics about the PooledRef should be hidden away in a separate interface. Since these can be viewed as metadata for the reference (which could grow beyond simple metrics to include some state), I suggest PooledRefMetadata.

The metadata would be accessible through a fourth method, PooledRef#metadata().

In order to limit garbage, internal concrete implementations of PooledRef should directly implement PooledRefMetadata and have metadata() return a view of this.

Shade JCTools

No sense in exposing a dependency to JCTools to users, so shading would make sense.

Should we keep AllocationStrategy#estimatePermitCount?

The AllocationStrategy was introduced to replace a AtomicIntegerFieldUpdater LIVE + final int maxSize cap. The estimatePermitCount was intended as a way to provide an equivalent to LIVE.get(), with the same limitations in terms of atomicity.

I don't think there is a race condition in the constructor, unless the AllocationStrategy is reused (which it shouldn't be)? Nothing can call getPermit until line 66 initSize.

But maybe the method should be replaced with getPermitCap()? More advanced strategies not purely based on a counter could still return Long.MAX_VALUE to indicate that they have a different mode of granting permits 🤔

Originally discussed in #4 (comment)

Deprecate lifo() vs fifo() distinction, preparing to remove lifo() flavor

Motivation

The lifo() flavor was introduced after misinterpreting a requirement, since clarified in #87.
I don't think it is actually that terribly useful to serve pending acquire() in a LIFO order.

If you have an actual use case for LIFO order of acquire (and not of idle resources), please comment on this issue.

As a reminder, the lifo() flavor means that when the pool is empty and no more resources can be created, an acquire is parked in a stack instead of a queue. And so, the next time a resource is released, the most recently parked acquire will be unparked and served.

Add disposeLater() method for non-blocking cleanup

Pool should expose a Mono<Void> disposeLater() method to initiate non-blocking shutdown and the ability to await cleanup. This is useful for scenarios in which a pool gets created/disposed using non-blocking flows.

Expose the release and allocation "timestamps" on `PooledRefMetadata`

Motivation

On top of ages, which are a function of the current wall-clock time, exposing timestamps (or numbers with a similar property) could be useful in case one needs a more absolute ordering.

Desired solution

Add allocationTimestamp() and releaseTimestamp() to PooledRefMetadata.

Additional context

This might prove a pre-requisite for #88.

Idle time should not depend on metrics recorder

Expected behavior

Idle time should not depend on metrics recorder

Actual behavior

The current implementation uses metrics recorder to calculate the idle time

void markReleased() {
    if (STATE.compareAndSet(this, STATE_ACQUIRED, STATE_RELEASED)) {
        this.timeSinceRelease = metricsRecorder.now();
    }
}

When there is no metrics recorder reactor.pool.NoOpPoolMetricsRecorder is used
where now() and measureTime(long startTimeMillis) always return 0

This change works for me

diff --git a/src/main/java/reactor/pool/NoOpPoolMetricsRecorder.java b/src/main/java/reactor/pool/NoOpPoolMetricsRecorder.java
index 532b4f8..419f7b6 100644
--- a/src/main/java/reactor/pool/NoOpPoolMetricsRecorder.java
+++ b/src/main/java/reactor/pool/NoOpPoolMetricsRecorder.java
@@ -30,12 +30,7 @@ final class NoOpPoolMetricsRecorder implements PoolMetricsRecorder {
 
     @Override
     public long now() {
-        return 0L;
-    }
-
-    @Override
-    public long measureTime(long startTimeMillis) {
-        return 0;
+        return System.currentTimeMillis();
     }
 
     @Override
diff --git a/src/main/java/reactor/pool/PoolMetricsRecorder.java b/src/main/java/reactor/pool/PoolMetricsRecorder.java
index 437cefb..37bd741 100644
--- a/src/main/java/reactor/pool/PoolMetricsRecorder.java
+++ b/src/main/java/reactor/pool/PoolMetricsRecorder.java
@@ -32,7 +32,9 @@ public interface PoolMetricsRecorder {
 	 * @param startTimeMillis the starting time initially obtained via {@link #now()}
 	 * @return the elapsed time in milliseconds
 	 */
-	long measureTime(long startTimeMillis);
+	default long measureTime(long startTimeMillis) {
+		return now() - startTimeMillis;
+	}
 
 	/**
 	 * Get a starting time with milliseconds resolution.

Steps to reproduce

Reactor Pool version

current snapshot version

JVM version (e.g. java -version)

OS version (e.g. uname -a)

Remove `EvictionPredicates`

It seems only the "time since idle" kind of predicate has a direct and obvious application in most use cases we can think of.

Since the pool can always be configured with an arbitrary Predicate and we'd still make metrics like age since allocation and number of acquisition available to such a predicate, we can remove this class and only offer evictionPredicate(Predicate) + evictionIdle(Duration ttl).

Inconsistent Builder sizeMin(…).sizeMax(…) behavior

Expected behavior

Calling PoolBuilder.sizeMin(min).sizeMax(max) should retain min and max values.

Actual behavior

Calling PoolBuilder.sizeMin(min).sizeMax(max) discards min-value and requires calling sizeBetween(…).

This behavior isn't obvious from method naming and Javadoc also does not reflect the current behavior.

Reactor Pool version

0.0.1.M3

When allocator fails to create a resource, acquire operation hangs for the pending requests

Expected Behavior

Acquire operation for pending requests does not hang

Actual Behavior

Let's have a pool with 1 max connection.
2 parallel requests: for the first one - the allocator will be invoked to create a resource, while the second one will wait.
When the allocator fails to create a resource, the first request correctly receives the error (the pool does not hold any resources as the creation of the resource failed).
The expectation is that the allocator will be invoked to create a new resource as a result of the second pending request, however this does not happen and the second request hangs forever.

There is no drain invocation when the allocator fails to create a resource:

allocator.subscribe(newInstance -> borrower.deliver(createSlot(newInstance)),
error -> {
metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start);
ACQUIRED.decrementAndGet(this);
poolConfig.allocationStrategy().returnPermits(1);
borrower.fail(error);
},
() -> metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start));

Possible Solution

Invoke drain when allocator fails to create a resource

Compile a list of potential pool metrics

The metrics could either be directly added to PoolMetricsRecorder as a single recording method, or as several methods that would allow computation of that metric.

Also, do we need to come up with something gauge-like (ie, a way to pull a metric from the Pool instance rather than letting the Pool trigger a record).

NullPointerException race condition when shutting down fifo pool

Expected Behavior

Shutting down the pool should finish successfully.

Actual Behavior

Occasionally a NPE is thrown while failing FIFO pending borrowers.

Steps to Reproduce

@ParameterizedTest
@MethodSource("allPools")
void raceShutdownAndAcquireInvalidate(Function<PoolBuilder<AtomicInteger, ?>, AbstractPool<AtomicInteger>> configAdjuster) {
	AtomicInteger ai = new AtomicInteger();
	PoolBuilder<AtomicInteger, PoolConfig<AtomicInteger>> configBuilder = PoolBuilder
			.from(Mono.fromSupplier(() -> {
				ai.incrementAndGet();
				return ai;
			}))
			.evictionIdle(Duration.ZERO)
			.destroyHandler(resource -> Mono.fromRunnable(resource::decrementAndGet))
			.sizeBetween(0, 1);

	AtomicReference<Throwable> errorRef = new AtomicReference<>();
	for (int i = 0; i < 100_000; i++) {
		errorRef.set(null);
		InstrumentedPool<AtomicInteger> pool = configAdjuster.apply(configBuilder);

		if (i % 2 == 0) {
			RaceTestUtils.race(
					() -> pool.disposeLater().subscribe(v -> {}, errorRef::set),
					() -> pool.acquire()
					          .flatMap(PooledRef::invalidate)
					          .onErrorResume(PoolShutdownException.class, e -> Mono.empty())
					          .subscribe(v -> {}, errorRef::set)
			);
		}
		else {
			RaceTestUtils.race(
					() -> pool.acquire()
					          .flatMap(PooledRef::invalidate)
					          .onErrorResume(PoolShutdownException.class, e -> Mono.empty())
					          .subscribe(v -> {}, errorRef::set),
					() -> pool.disposeLater().subscribe(v -> {}, errorRef::set)
			);
		}
		if (errorRef.get() != null) {
			errorRef.get().printStackTrace();
		}
		assertThat(errorRef.get()).as("exception in " + configAdjuster.toString() + " iteration " + i).isNull();
	}
	assertThat(ai).as("creates and destroys stabilizes to 0").hasValue(0);
}
[exception in simplePool FIFO iteration 16313] 
Expecting:
 <java.lang.NullPointerException>
to be equal to:
 <null>
but was not.

Possible Solution

The PENDING queue can occasionally produce a null on poll() despite isEmpty() returning false:

                while(!q.isEmpty()) {
                    q.poll().fail(new PoolShutdownException());
                }

Instead of double reading, one can simply just poll() and check for nulls:

                Borrower<POOLABLE> nextPending;
                while((nextPending = q.poll()) != null) {
                    nextPending.fail(new PoolShutdownException());
                }

Acquire always fails when about to reach maxPending, even if pool has idle resource / allocation capacity

Expected Behavior

Setting maxPending to n fails the n-th acquire only if there is no idle resource AND no capacity in the pool for allocating a new resource.

Setting maxPending(0) would fail without ever "parking" an acquire, but acquire should be possible if the pool has idle resources or capacity to allocate.

Actual Behavior

Setting maxPending(0) turns every acquire into a failure, even if there is an idle resource, or some capacity to allocate.

Similarly, setting it higher results in an off-by one error where the pool should allow an allocation to take place.

Steps to Reproduce

	@ParameterizedTest
	@MethodSource("allPools")
	void maxPendingZero(Function<PoolBuilder<Integer, ?>, AbstractPool<Integer>> configAdjuster) {
		AtomicInteger source = new AtomicInteger();
		PoolBuilder<Integer, PoolConfig<Integer>> configBuilder = PoolBuilder
				.from(Mono.fromSupplier(source::incrementAndGet))
				.sizeBetween(1, 2)
				.maxPendingAcquire(0);

		AbstractPool<Integer> pool = configAdjuster.apply(configBuilder);

		assertThat(pool.warmup().block(Duration.ofSeconds(1))).as("warmup").isOne();

		//there is one idle resource
		assertThat(pool.acquire().block(Duration.ofSeconds(1)))
				.as("acquire on idle")
				.isNotNull()
				.hasFieldOrPropertyWithValue("poolable", 1);

		//there is now idle resource, but still capacity
		assertThat(pool.acquire().block(Duration.ofSeconds(1)))
				.as("acquire on allocate")
				.isNotNull()
				.hasFieldOrPropertyWithValue("poolable", 2);

		//there is now idle resource, but still capacity
		assertThatExceptionOfType(PoolAcquirePendingLimitException.class)
				.isThrownBy(() -> pool.acquire().block(Duration.ofSeconds(1)))
				.as("acquire on maxPending")
				.withMessage("No pending allowed and pool has reached allocation limit");
	}

	@ParameterizedTest
	@MethodSource("allPools")
	void maxPendingOne(Function<PoolBuilder<Integer, ?>, AbstractPool<Integer>> configAdjuster)
			throws InterruptedException {
		AtomicInteger source = new AtomicInteger();
		PoolBuilder<Integer, PoolConfig<Integer>> configBuilder = PoolBuilder
				.from(Mono.fromSupplier(source::incrementAndGet))
				.sizeBetween(1, 2)
				.maxPendingAcquire(1);

		AbstractPool<Integer> pool = configAdjuster.apply(configBuilder);

		assertThat(pool.warmup().block(Duration.ofSeconds(1))).as("warmup").isOne();

		//there is one idle resource
		assertThat(pool.acquire().block(Duration.ofSeconds(1)))
				.as("acquire on idle")
				.isNotNull()
				.hasFieldOrPropertyWithValue("poolable", 1);

		//there is now idle resource, but still capacity
		assertThat(pool.acquire().block(Duration.ofSeconds(1)))
				.as("acquire on allocate")
				.isNotNull()
				.hasFieldOrPropertyWithValue("poolable", 2);

		//there is now idle resource, no capacity, but pending 1 is allowed
		//in order to test the scenario where we've reached maxPending, this pending must not be cancelled
		final CountDownLatch latch = new CountDownLatch(1);
		final AtomicReference<Throwable> endingSignal = new AtomicReference<>();
		pool.acquire().timeout(Duration.ofSeconds(1))
		    .doFinally(s -> {
		    	latch.countDown();
		    })
		    .subscribe(v -> {}, endingSignal::set);

		//there is now idle resource, no capacity and we've reached maxPending
		assertThatExceptionOfType(PoolAcquirePendingLimitException.class)
				.isThrownBy(() -> pool.acquire().block(Duration.ofSeconds(1)))
				.as("pending acquire on maxPending")
				.withMessage("Pending acquire queue has reached its maximum size of 1");

		//post-assert the intermediate
		assertThat(latch.await(2, TimeUnit.SECONDS)).as("pending 1 was parked for 1s").isTrue();
		assertThat(endingSignal.get()).as("pending 1 timed out after 1s").hasMessageStartingWith("Did not observe any item or terminal signal within 1000ms");
	}

Possible Solution

Check the idleResource size and allocationStrategy in SimpleDequePool#pendingOffer.

Replace generic RuntimeException with pool-specific exceptions

Prime example is RuntimeException propagated to pending monos in disposeLater, which could benefit from being specific (eg. PoolShutdownException).

Maybe do the same for the IllegalStateException that is thrown when the max pending queue size is reached?

Bump to Gradle 6.5.1 and add japicmp

  • Bump to Gradle 6.5.1

    • remove OsgiHelper import and usage
    • bump artifactory to 4.15.2
    • add bnd plugin id 'biz.aQute.bnd.builder' version '5.0.1' apply false
    • avoid generating module metadata (see snippet below)
    • apply plugin: 'biz.aQute.bnd.builder'
    • replace bundleImportPackages with a bndOptions string[][] array (see below)
    • configure bnd with the bndOptions in the jar task: bnd(bndOptions)
  • add plugin id "me.champeau.gradle.japicmp" version "0.2.6"

    • add plugin id "de.undercouch.download" version "3.4.3"
    • check.dependsOn japicmp
    • add downloadBaseline and japicmp tasks (without the Rule, only simplified exclusions)
    • add compatibleVersion=0.1.0.RELEASE to gradle.properties

To avoid generating module metadata:

tasks.withType(GenerateModuleMetadata) {
  enabled = false
}

bnd options:

  ext.bndOptions = [
    "Export-Package": [
      "!*internal*",
      "reactor.*"
    ].join(","),
    "Import-Package": [
      "!javax.annotation",
      "*"
    ].join(","),
    "Bundle-Name" : "reactor-pool",
    "Bundle-SymbolicName" : "io.projectreactor.addons.reactor-pool"
  ]

Provide a way to limit the number of pending borrowers

As a second bulkhead against crashing a system due to resource exhaustion, let the user provide an upper limit to the number of pending borrowers a Pool can have. Thus

  • past the allocationStrategy permits, no new resource would be created, and
  • past the maxPendingQueueSize pool.acquire().subscribe() would fail fast.

Cancellation of `acquire()` Mono should remove it from the pending list

Currently, in case a subscribed acquire() Mono is cancelled, it is still internally viewable by its associated pool, which still considers it pending. In case a resource is made available, it could thus be elected as the next receiver of said resource, in which case it will immediately release the resource back. This is correct, but wasteful.

Provided the pending data structures used internally all support arbitrary item removal, this could be optimized by having the borrower's cancel() method remove itself for the pending of the pool (while atomically marking itself as cancelled just in case of a race between cancellation and resource availability).

Note that the naive internal TreiberStack doesn't support such arbitrary removals, and neither do reactor-core Queue implementations. However, JCTools queues do support removals, and ConcurrentLinkedDeque is probably a good stack alternative that does as well.

Remove blocking allocations in the pool constructor and add a "warmup" API

initialSize only makes sense if the instances are created within the boundaries of the constructor. But that implies blocking on the allocator Mono, which is undesirable (see discussion here).

Instead, a mechanism to increase the number of (idle) objects in the pool can be provided.

This would allocate new objects, given enough permits from the allocationStrategy, without blocking.

Proposed API:

/**
 * @param desired the desired number of new idle poolable to allocate, within the pool's limit
 * @return a Mono representing the actual number of allocated poolables, which might be less than the desired amount (down to 0) if the pool has a maximum size. The Mono completes when allocations have completed.
 */
Mono<Integer> growIdle(int desired);

Warmup should not decrement ACQUIRE and should run after main attempt

This is especially visible when there is warmup with an allocator that errors.

See #83, when running allocatorErrorInAcquireDrains_WithMinSize in a loop sometimes the test fails because an onComplete() is emitted rather than an onError().

After investigation, there are two issues with the warmup code:

  • it shouldn't decrement ACQUIRE (which is only incremented once)
  • it should run after the primary allocation

Remove `AllocationStrategies`

The two strategies can be made part of the PoolBuilder:

 allocationStrategy(AllocationStrategy arbitraryStrategy);
 sizeUnbounded();
 sizeMax(int maxPoolSize);

Introduce a timeout notion JUST for the pending acquire case

Calling Pool#acquire().timeout() (the vanilla timeout operator) introduces a timeout that can effectively trigger in two cases:

  • the pool is depleted and cannot allocate more resources: timeout on pending acquire (time waiting for a resource to be released)
  • the pool is depleted BUT can allocate new resources: timeout on the allocation of one new resource

It appear that in some cases, the resource allocation can take some more or less arbitrary time that we are ok waiting for if we now we'll end up with a resource, but we don't want to wait more than necessary if we know we can't allocate more resources.

i.e. we want a timeout on the pending case only.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.