Coder Social home page Coder Social logo

davidmoten / rxjava2-jdbc Goto Github PK

View Code? Open in Web Editor NEW
389.0 389.0 41.0 1.43 MB

RxJava2 integration with JDBC including Non-blocking Connection Pools

License: Apache License 2.0

Java 99.81% Shell 0.03% Kotlin 0.15%
concurrency database java jdbc reactive reactive-programming reactive-streams rxjava

rxjava2-jdbc's Introduction

rxjava2-jdbc

How to build

Use maven:

mvn clean install

The project builds fine on Java 8 to 17.

Getting started

Use this maven dependency:

<dependency>
  <groupId>com.github.davidmoten</groupId>
  <artifactId>rxjava2-jdbc</artifactId>
  <version>VERSION_HERE</version>
</dependency>

If you want to use the built-in test database then add the Apache Derby dependency (otherwise you’ll need the jdbc dependency for the database you want to connect to):

<dependency>
  <groupId>org.apache.derby</groupId>
  <artifactId>derby</artifactId>
  <version>10.14.2.0</version>
</dependency>

Database

To start things off you need a Database instance. Given the jdbc url of your database you can create a Database object like this:

Database db = Database.from(url, maxPoolSize);

Support for playing with rxjava2-jdbc!

If you want to have a play with a built-in test database then do this:

Database db = Database.test(maxPoolSize);

To use the built-in test database you will need the Apache Derby dependency:

<dependency>
  <groupId>org.apache.derby</groupId>
  <artifactId>derby</artifactId>
  <version>10.14.2.0</version>
</dependency>

The test database has a few tables (see script) including Person and Address with three rows in Person and two rows in Address:

schema

Each time you call Database.test(maxPoolSize) you will have a fresh new database to play with that is loaded with data as described above.

A query example

Let’s use a Database instance to perform a select query on the Person table and write the names to the console:

Database db = Database.test();
db.select("select name from person")
  .getAs(String.class)
  .blockingForEach(System.out::println);

Output is

FRED
JOSEPH
MARMADUKE

Note the use of blockingForEach. This is only for demonstration purposes. When a query is executed it is executed asynchronously on a scheduler that you can specify if desired. The default scheduler is:

Schedulers.from(Executors.newFixedThreadPool(maxPoolSize));

While you are processing reactively you should avoid blocking calls but domain boundaries sometimes force this to happen (e.g. accumulate the results and return them as xml over the network from a web service call). Bear in mind also that if you are worried about the complexities of debugging RxJava programs then you might wish to make brief limited forays into reactive code. That’s completely fine too. What you lose in efficiency you may gain in simplicity.

Asynchrony

The query flowables returned by the Database all run asynchronously. This is required because of the use of non-blocking connection pools. When a connection is returned to the pool and then checked-out by another query that checkout must occur on a different thread so that stack overflow does not occur. See the Non-blocking connection pools section for more details.

Nulls

RxJava2 does not support streams of nulls. If you want to represent nulls in your stream then use java.util.Optional.

In the special case where a single nullable column is being returned and mapped to a class via getAs you should instead use getAsOptional:

Database.test()
  .select("select date_of_birth from person where name='FRED'")
  .getAsOptional(Instant.class)
  .blockingForEach(System.out::println);

Output:

Optional.empty

Nulls will happily map to Tuples (see the next section) when you have two or more columns.

Null parameters

You can specify an explicit null parameter like this:

Database.test()
  .update("update person set date_of_birth = ?")
  .parameter(null)
  .counts()
  .blockingForEach(System.out::println);

or using named parameters:

Database.test()
  .update("update person set date_of_birth = :dob")
  .parameter(Parameter.create("dob", null))
  .counts()
  .blockingForEach(System.out::println);

If you use a stream of parameters then you have to be more careful (nulls are not allowed in RxJava streams):\

Database.test()
  .update("update person set date_of_birth = ?")
  .parameterStream(Flowable.just(Parameter.NULL))
  .counts()
  .blockingForEach(System.out::println);

Tuple support

When you specify more types in the getAs method they are matched to the columns in the returned result set from the query and combined into a Tuple instance. Here’s an example that returns Tuple2:

Database db = Database.test();
db.select("select name, score from person")
  .getAs(String.class, Integer.class)
  .blockingForEach(System.out::println);

Output

Tuple2 [value1=FRED, value2=21]
Tuple2 [value1=JOSEPH, value2=34]
Tuple2 [value1=MARMADUKE, value2=25]

Tuples are defined from Tuple2 to Tuple7 and for above that to TupleN.

Manual mapping

You can map each row of the JDBC ResultSet to your own object using the get method:

Database db = Database.test();
db.select("select name, score from person order by name")
  .get(rs -> new Person(rs.getString("name"), rs.getInt("score")))
  .doOnNext(System.out::println) //
  .subscribe();

By the way it is definitely not a good idea to hang on to a bunch of rs objects as their state will always be that of the latest read row. For this reason you immediately map it to something else either by manually mapping or automapping.

Automap

To map the result set values to an interface, first declare an interface:

interface Person {
  @Column
  String name();

  @Column
  int score();
}

In the query use the autoMap method and let’s use some of the built-in testing methods of RxJava2 to confirm we got what we expected:

Database db = Database.test();
db.select("select name, score from person order by name")
  .autoMap(Person.class)
  .doOnNext(System.out::println)
  .firstOrError()
  .map(Person::score)
  .test()
  .assertValue(21)
  .assertComplete();

If your interface method name does not exactly match the column name (underscores and case are ignored) then you can add more detail to the Column annotation:

interface Person {
  @Column("name")
  String fullName();

  @Column("score")
  int examScore();
}

You can also refer to the 1-based position of the column in the result set instead of its name:

interface Person {
  @Index(1)
  String fullName();

  @Index(2)
  int examScore();
}

In fact, you can mix use of named columns and indexed columns in automapped interfaces.

If you don’t configure things correctly these exceptions may be emitted and include extra information in the error message about the affected automap interface:

  • AnnotationsNotFoundException

  • ColumnIndexOutOfRangeException

  • ColumnNotFoundException

  • ClassCastException

  • AutomappedInterfaceInaccessibleException

Automapped toString

The toString() method is implemented for automapped objects. For example the toString method for a Person object produces something like:

Person[name=FRED, score=21]

Automapped equals/hashCode

The equals and hashCode methods on automapped objects have been implemented based on method value comparisons. For example

  • Person[name=FRED, score=21] is equal to Person[name=FRED, score=21]

  • Person[name=FRED, score=21] is not equal to Person[name=FRED, score=22]

  • Person[name=FRED, score=21] is not equal to Person2[name=FRED, score=21]

Note that if you try to compare an automapped object with a custom implementation of the automapped interface then the custom implementation must implement equals/hashCode in the same way. In short, avoid doing that!

Automapped interface with default methods

  • Java 8 - Calling a default method on an automapped interface is supported provided the interface is public and you use the default SecurityManager.

  • Java 9 - not supported yet (TODO)

Automap with annotated query

The automapped interface can be annotated with the select query:

@Query("select name, score from person order by name")
interface Person {
   @Column
   String name();

   @Column
   int score();
}

To use the annotated interface:

Database
  .test()
  .select(Person.class)
  .get()
  .map(Person::name)
  .blockingForEach(System.out::println);

Output:

FRED
JOSEPH
MARMADUKE

In fact the .map is not required if you use a different overload of get:

Database
  .test()
  .select(Person.class)
  .get(Person::name)
  .blockingForEach(System.out::println);

Automap with Kotlin

See example. Below is how you annotate an interface in Kotlin for automap:

@Query("select name from person order by name")
interface Person {
    @Column("name")
    fun nm() : String
}

Auto mappings

The automatic mappings below of objects are used in the autoMap() method and for typed getAs() calls. * java.sql.Date,java.sql.Time,java.sql.Timestamp ⇐⇒ java.util.Date * java.sql.Date,java.sql.Time,java.sql.Timestamp =⇒ java.lang.Long * java.sql.Date,java.sql.Time,java.sql.Timestamp =⇒ java.time.Instant * java.sql.Date,java.sql.Time,java.sql.Timestamp =⇒ java.time.ZonedDateTime * java.sql.Blob ⇐⇒ java.io.InputStream, byte[] * java.sql.Clob ⇐⇒ java.io.Reader, String * java.math.BigInteger =⇒ Long, Integer, Decimal, Float, Short, java.math.BigDecimal * java.math.BigDecimal =⇒ Long, Integer, Decimal, Float, Short, java.math.BigInteger

Parameters

Parameters are passed to individual queries but can also be used as a streaming source to prompt the query to be run many times.

Parameters can be named or anonymous. Named parameters are not supported natively by the JDBC specification but rxjava2-jdbc does support them.

This is sql with a named parameter:

select name from person where name=:name

This is sql with an anonymous parameter:

select name from person where name=?

Explicit anonymous parameters

In the example below the query is first run with name='FRED' and then name=JOSEPH. Each query returns one result which is printed to the console.

Database.test()
  .select("select score from person where name=?")
  .parameters("FRED", "JOSEPH")
  .getAs(Integer.class)
  .blockingForEach(System.out::println);

Output is:

21
34

Flowable anonymous parameters

You can specify a stream as the source of parameters:

Database.test()
  .select("select score from person where name=?")
  .parameterStream(Flowable.just("FRED","JOSEPH").repeat())
  .getAs(Integer.class)
  .take(3)
  .blockingForEach(System.out::println);

Output is:

21
34
21

Mixing explicit and Flowable parameters

Database.test()
  .select("select score from person where name=?")
  .parameterStream(Flowable.just("FRED","JOSEPH"))
  .parameters("FRED", "JOSEPH")
  .getAs(Integer.class)
  .blockingForEach(System.out::println);

Output is:

21
34
21
34

Multiple parameters per query

If there is more than one parameter per query:

Database.test()
  .select("select score from person where name=? and score=?")
  .parameterStream(Flowable.just("FRED", 21, "JOSEPH", 34).repeat())
  .getAs(Integer.class)
  .take(3)
  .blockingForEach(System.out::println);

or you can group the parameters into lists (each list corresponds to one query) yourself:

Database.test()
  .select("select score from person where name=? and score=?")
  .parameterListStream(Flowable.just(Arrays.asList("FRED", 21), Arrays.asList("JOSEPH", 34)).repeat())
  .getAs(Integer.class)
  .take(3)
  .blockingForEach(System.out::println);

Running a query many times that has no parameters

If the query has no parameters you can use the parameters to drive the number of query calls (the parameter values themselves are ignored):

Database.test()
  .select("select count(*) from person")
  .parameters("a", "b", "c")
  .getAs(Integer.class)
  .blockingForEach(System.out::println);

Output:

3
3
3

Collection parameters

Collection parameters are useful for supplying to IN clauses. For example:

Database.test()
  .select("select score from person where name in (?) order by score")
  .parameter(Sets.newHashSet("FRED", "JOSEPH"))
  .getAs(Integer.class)
  .blockingForEach(System.out::println);

or with named parameters:

Database.test()
  .update("update person set score=0 where name in (:names)")
  .parameter("names", Lists.newArrayList("FRED", "JOSEPH"))
  .counts()
  .blockingForEach(System.out::println);

You need to pass an implementation of java.util.Collection to one of these parameters (for example java.util.List or java.util.Set).

Under the covers rxjava2-jdbc does not use PreparedStatement.setArray because of the patchy support for this method (not supported by DB2 or MySQL for instance) and the extra requirement of specifying a column type.

Note that databases normally have a limit on the number of parameters in a statement (or indeed the size of array that can be passed in setArray). For Oracle it’s O(1000), H2 it is O(20000).

select and update statements are supported as of 0.1-RC23. If you need callable statement support raise an issue.

Non-blocking connection pools

A new exciting feature of rxjava2-jdbc is the availability of non-blocking connection pools.

In normal non-reactive database programming a couple of different threads (started by servlet calls for instance) will race for the next available connection from a pool of database connections. If no unused connection remains in the pool then the standard non-reactive approach is to block the thread until a connection becomes available.

Blocking a thread is a resource issue as each blocked thread holds onto ~0.5MB of stack and may incur context switch and memory-access delays (adds latency to thread processing) when being switched to. For example 100 blocked threads hold onto ~50MB of memory (outside of java heap).

rxjava-jdbc2 uses non-blocking JDBC connection pools by default (but is configurable to use whatever you want). What happens in practice is that for each query a subscription is made to a MemberSingle instance controlled by the NonBlockingConnectionPool object that emits connections when available to its subscribers (first in best dressed). So the definition of the processing of that query is stored on a queue to be started when a connection is available. Adding the Flowable definition of your query to the queue can be quite efficient in terms of memory use compared to the memory costs of thread per query. For example a heap dump of 1000 queued simple select statements from the person table in the test database used 429K of heap. That is 429 bytes per query.

The simplest way of creating a Database instance with a non-blocking connection pool is:

Database db = Database.from(url, maxPoolSize);

If you want to play with the in-memory built-in test database (requires Apache Derby dependency) then:

Database db = Database.test(maxPoolSize);

If you want more control over the behaviour of the non-blocking connection pool:

Database db = Database
  .nonBlocking()
  // the jdbc url of the connections to be placed in the pool
  .url(url)
  // an unused connection will be closed after thirty minutes
  .maxIdleTime(30, TimeUnit.MINUTES)
  // connections are checked for healthiness on checkout if the connection
  // has been idle for at least 5 seconds
  .healthCheck(DatabaseType.ORACLE)
  .idleTimeBeforeHealthCheck(5, TimeUnit.SECONDS)
  // if a connection fails creation then retry after 30 seconds
  .createRetryInterval(30, TimeUnit.SECONDS)
  // the maximum number of connections in the pool
  .maxPoolSize(3)
  .build();

Note that the health check varies from database to database. The following databases are directly supported with DatabaseType instances:

  • DB2

  • Derby

  • HSQLDB

  • H2

  • Informix

  • MySQL

  • Oracle

  • Postgres

  • Microsoft SQL Server

  • SQLite

Demonstration

Lets create a database with a non-blocking connection pool of size 1 only and demonstrate what happens when two queries run concurrently. We use the in-built test database for this one so you can copy and paste this code to your ide and it will run (in a main method or unit test say):

// create database with non-blocking connection pool
// of size 1
Database db = Database.test(1);

// start a slow query
db.select("select score from person where name=?")
  .parameter("FRED")
  .getAs(Integer.class)
   // slow things down by sleeping
  .doOnNext(x -> Thread.sleep(1000))
   // run in background thread
  .subscribeOn(Schedulers.io())
  .subscribe();

// ensure that query starts
Thread.sleep(100);

// query again while first query running
db.select("select score from person where name=?")
  .parameter("FRED")
  .getAs(Integer.class)
  .doOnNext(x -> System.out.println("emitted on " + Thread.currentThread().getName()))
  .subscribe();

System.out.println("second query submitted");

// wait for stuff to happen asynchronously
Thread.sleep(5000);

The output of this is

second query submitted
emitted on RxCachedThreadScheduler-1

What has happened is that

  • the second query registers itself as something that will run as soon as a connection is released (by the first query).

  • no blocking occurs and we immediately see the first line of output

  • the second query runs after the first

  • in fact we see that the second query runs on the same Thread as the first query as a direct consequence of non-blocking design

Large objects support

Blobs and Clobs are straightforward to handle.

Insert a Clob

Here’s how to insert a String value into a Clob (document column below is of type CLOB):

String document = ...
Flowable<Integer> count = db
  .update("insert into person_clob(name,document) values(?,?)")
  .parameters("FRED", document)
  .count();

If your document is nullable then you should use Database.clob(document):

String document = ...
Flowable<Integer> count = db
  .update("insert into person_clob(name,document) values(?,?)")
  .parameters("FRED", Database.clob(document))
  .count();

Using a java.io.Reader:

Reader reader = ...;
Flowable<Integer> count = db
  .update("insert into person_clob(name,document) values(?,?)")
  .parameters("FRED", reader)
  .count();

Insert a Null Clob

Flowable<Integer> count = db
  .update("insert into person_clob(name,document) values(?,?)")
  .parameters("FRED", Database.NULL_CLOB)
  .count();

or

Flowable<Integer> count = db
  .update("insert into person_clob(name,document) values(?,?)")
  .parameters("FRED", Database.clob(null))
  .count();

Read a Clob

Flowable<String> document =
  db.select("select document from person_clob")
    .getAs(String.class);

or

Flowable<Reader> document =
  db.select("select document from person_clob")
    .getAs(Reader.class);

Read a Null Clob

For the special case where you want to return one value from a select statement and that value is a nullable CLOB then use getAsOptional:

db.select("select document from person_clob where name='FRED'")
  .getAsOptional(String.class)

Insert a Blob

Similarly for Blobs (document column below is of type BLOB):

byte[] bytes = ...
Flowable<Integer> count = db
  .update("insert into person_blob(name,document) values(?,?)")
  .parameters("FRED", Database.blob(bytes))
  .count();

Insert a Null Blob

This requires either a special call (parameterBlob(String) to identify the parameter as a CLOB:

Flowable<Integer> count = db
  .update("insert into person_blob(name,document) values(?,?)")
  .parameters("FRED", Database.NULL_BLOB)
  .count();

or

Flowable<Integer> count = db
  .update("insert into person_clob(name,document) values(?,?)")
  .parameters("FRED", Database.blob(null))
  .count();

Read a Blob

Flowable<byte[]> document =
  db.select("select document from person_clob")
    .getAs(byte[].class);

or

Flowable<InputStream> document =
  db.select("select document from person_clob")
    .getAs(InputStream.class);

Returning generated keys

If you insert into a table that say in h2 is of type auto_increment then you don’t need to specify a value but you may want to know what value was inserted in the generated key field.

Given a table like this

create table note(
    id bigint auto_increment primary key,
    text varchar(255)
)

This code inserts two rows into the note table and returns the two generated keys:

Flowable<Integer> keys =
    db.update("insert into note(text) values(?)")
      .parameters("hello", "there")
      .returnGeneratedKeys()
      .getAs(Integer.class);

The returnGeneratedKeys method also supports returning multiple keys per row so the builder offers methods just like select to do explicit mapping or auto mapping.

Transactions

Transactions are a critical feature of relational databases.

When we’re talking RxJava we need to consider the behaviour of individual JDBC objects when called by different threads, possibly concurrently. The approach taken by rxjava2-jdbc outside of a transaction safely uses Connection pools (in a non-blocking way). Inside a transaction we must make all calls to the database using the same Connection object so the behaviour of that Connection when called from different threads is important. Some JDBC drivers provide thread-safety on JDBC objects by synchronizing every call.

The safest approach with transactions is to perform all db interaction synchronously. Asynchronous processing within transactions was problematic in rxjava-jdbc because ThreadLocal was used to hold the Connection. Asynchronous processing with transactions is possible with rxjava2-jdbc but should be handled with care given that your JDBC driver may block or indeed suffer from race conditions that most users don’t encounter.

Let’s look at some examples. The first example uses a transaction across two select statement calls:

Database.test()
  .select("select score from person where name=?")
  .parameters("FRED", "JOSEPH")
  .transacted()
  .getAs(Integer.class)
  .blockingForEach(tx ->
    System.out.println(tx.isComplete() ? "complete" : tx.value()));

Output:

21
34
complete

Note that the commit/rollback of the transaction happens automatically.

What we see above is that each emission from the select statement is wrapped with a Tx object including the terminal event (error or complete). This is so you can for instance perform an action using the same transaction.

Let’s see another example that uses the Tx object to update the database. We are going to do something a bit laborious that would normally be done in one update statement (update person set score = -1) just to demonstrate usage:

Database.test()
  .select("select name from person")
  // don't emit a Tx completed event
  .transactedValuesOnly()
  .getAs(String.class)
  .flatMap(tx -> tx
    .update("update person set score=-1 where name=:name")
    .parameter("name", tx.value())
    // don't wrap value in Tx object
    .valuesOnly()
    .counts())
  .toList()
  .blockingForEach(System.out::println);

Output:

[1, 1, 1]

Callable Statements

Callable statement support is a major addition to the code base as of 0.1-RC23.

Callable support is present only outside of transactions (transaction support coming later). If you’re keen for it, raise an issue. The primary impediment is the duplication of a bunch of chained builders for the transacted case.

For example:

Flowable<Tuple2<Integer,Integer>> tuples =
  db.call("call in1out2(?,?,?)")
    .in()
    .out(Type.INTEGER, Integer.class)
    .out(Type.INTEGER, Integer.class)
    .input(0, 10, 20);

Note above that each question mark in the call statement correponds in order with a call to in() or out(…​). Once all parameters have been defined then the input(0, 10, 20) call drives the running of the query with that input. The output Flowable is strongly typed according to the out parameters specified.

When you start specifying output ResultSet s from the call then you lose output parameter strong typing but gain ResultSet mapped strong typing as per normal select statements in rxjava2-jdbc.

Here’s an example for one in parameter and two output ResultSet s with autoMap. You can of course use getAs instead (or get):

Flowable<String> namePairs =
  db
    .call("call in1out0rs2(?)")
    .in()
    .autoMap(Person2.class)
    .autoMap(Person2.class)
    .input(0, 10, 20)
    .flatMap(x ->
      x.results1()
       .zipWith(x.results2(), (y, z) -> y.name() + z.name()));

The above example is pretty nifty in that we can zip the two result sets resulting from the call and of course the whole thing was easy to define (as opposed to normal JDBC).

You just saw autoMap used to handle an output ResultSet but getAs works too:

Flowable<String> namePairs =
  db
    .call("call in1out0rs2(?)")
    .in()
    .getAs(String.class, Integer.class)
    .getAs(String.class, Integer.class
    .input(0, 10, 20)
    .flatMap(x ->
      x.results1()
       .zipWith(x.results2(), (y, z) -> y._1() + z._1()));

You can explore more examples of this in [DatabaseTest.java](rxjava2-jdbc/src/test/java/org/davidmoten/rx/jdbc/DatabaseTest.java). Search for .call.

Using raw JDBC

A few nifty things in JDBC may not yet directly supported by rxjava2-jdbc but you can get acccess to the underlying Connection s from the Database object by using Database.apply or Database.member().

Here’s an example where you want to return something from a Connection (say you called a stored procedure and returned an integer):

Database db = ...
Single<Integer> count =
  db.apply(
     con -> {
       //do whatever you want with the connection
       // just don't close it!
       return con.getHoldability();
     });

If you don’t want to return something then use a different overload of apply:

Completable c =
  db.apply(con -> {
       //do whatever you want with the connection
     });

Here are lower level versions of the above examples where you take on the responsibility of returning the connection to the pool.

Database db = ...
Single<Integer> count = db.member()
  .map(member -> {
     Connection con = member.value();
     try {
       //do whatever you want with the connection
       return count;
     } finally {
       // don't close the connection, just hand it back to the pool
       // and don't use this member again!
       member.checkin();
     });

and

Completable completable = db.member()
  .doOnSuccess(member -> {
     Connection con = member.value();
     try {
       //do whatever you want with the connection
     } finally {
       // don't close the connection, just hand it back to the pool
       // and don't use this member again!
       member.checkin();
     }).ignoreElements();

Logging

Logging is handled by slf4j which bridges to the logging framework of your choice. Add the dependency for your logging framework as a maven dependency and you are sorted. See the test scoped log4j example in rxjava2-jdbc/pom.xml.

rxjava2-jdbc's People

Contributors

davidmoten avatar dependabot[bot] avatar mu4h4h4 avatar praveenmathew92 avatar rai-sandeep avatar ykrkn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxjava2-jdbc's Issues

Exhausted connection pool on Select with transaction.

Versions are 0.2.4, 0.2.5-SNAPSHOT

I wrote simple service to pumping data from databases and that solution requires to runs select with transactions (autoCommit = false)

It seems transacted connections for Selects not closing correctly and this behavior going to exhausing of the pool.

My shorter code snippet below

SelectBuilder sb = db.select("select * from test limit 1").fetchSize(1024);
TransactedSelectBuilder txb = sb.transactedValuesOnly();

Flowable<ExecuteQueryResponseRow> flowableResult = txb
        .get((rs) -> create(rs))
        .map(tx -> tx.value());
return Flux.from(flowableResult);

I did try to use nonBlockingPool and Database.fromBlocking with the same result.
In my opinion the reason is org.davidmoten.rx.jdbc.TransactedSelectBuilder
I did set maxPoolSize = 1 to replay the case. Below the logs to explain the problem.
Database was PostreSQL9.x but i guess this does not matter.

-- prepare select and gets the result
13:04:51.171 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.SqlInfo - sqlAfterSubs=select * from test limit 1
13:04:51.199 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Util - creating new TransactedConnection
13:04:51.200 [reactor-http-nio-2] DEBUG o.d.rx.jdbc.TransactedConnection - constructing TransactedConnection from org.davidmoten.rx.jdbc.pool.internal.PooledConnection@d7f94f2, 1
13:04:51.200 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Select - Select.create called with con=TransactedConnection [con=org.davidmoten.rx.jdbc.pool.internal.PooledConnection@d7f94f2, counter=1]
13:04:51.203 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.SqlInfo - sqlAfterSubs=select * from test limit 1
13:04:51.203 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Util - preparing statement: select * from test limit 1
13:04:51.209 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Select - parameters=[]
13:04:51.209 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Select - names=[]
13:04:51.224 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Select - getting row from ps=org.davidmoten.rx.jdbc.TransactedPreparedStatement@47ab9d1a, rs=org.davidmoten.rx.jdbc.TransactedResultSet@75631083
13:04:51.226 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Select - emitting ExecuteQueryResponseRow(columns=null, data=[2000-01-01 00:00:00.0])
13:04:51.271 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Select - getting row from ps=org.davidmoten.rx.jdbc.TransactedPreparedStatement@47ab9d1a, rs=org.davidmoten.rx.jdbc.TransactedResultSet@75631083
-- tx completed
13:04:51.271 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Select - completed
-- close the statement
13:04:51.271 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Util - closing org.davidmoten.rx.jdbc.TransactedPreparedStatement@47ab9d1a
-- closing connection will be skipped because the transaction was not commited yet
13:04:51.271 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Util - closing TransactedConnection [con=org.davidmoten.rx.jdbc.pool.internal.PooledConnection@d7f94f2, counter=1]
13:04:51.271 [reactor-http-nio-2] DEBUG o.d.rx.jdbc.TransactedConnection - TransactedConnection attempt close
13:04:51.271 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Util - closing org.davidmoten.rx.jdbc.TransactedResultSet@75631083
-- commit the tx but connection will not be closed anymore
13:04:51.273 [reactor-http-nio-2] DEBUG o.d.rx.jdbc.TransactedConnection - TransactedConnection commit attempt, counter=1
13:04:51.273 [reactor-http-nio-2] DEBUG o.d.rx.jdbc.TransactedConnection - TransactedConnection actual commit
-- next select waits the connect eternally
13:04:55.856 [reactor-http-nio-3] DEBUG org.davidmoten.rx.jdbc.SqlInfo - sqlAfterSubs=select * from test limit 1

Ok, i watch a codebase.

org.davidmoten.rx.jdbc.TransactedUpdateBuilder
seems ok and connection closing after the tx commited.

.doOnNext(tx -> {
    t[0] = ((TxImpl<Integer>) tx);
}) //
.doOnComplete(() -> {
    TxImpl<?> tx = t[0];
    if (tx.isComplete()) {
        tx.connection().commit();
    }
    Util.closeSilently(tx.connection());
}));

org.davidmoten.rx.jdbc.TransactedSelectBuilder
But here i no see something like that. Hmmm, this is the bug or i do something wrong?

.doOnNext(tx -> {
    if (tx.isComplete()) {
        ((TxImpl<T>) tx).connection().commit();
    }
});

After adding the line all works as expected.

.doOnNext(tx -> {
    if (tx.isComplete()) {
        ((TxImpl<T>) tx).connection().commit();
        ((TxImpl<T>) tx).connection().close();
    }
});

The log for the fixed code.

13:09:26.801 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.SqlInfo - sqlAfterSubs=select * from test limit 1
-- omit irrelevant details
-- tx completed
13:09:26.894 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Select - completed
13:09:26.894 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Util - closing org.davidmoten.rx.jdbc.TransactedPreparedStatement@1ca97e02
-- closing connection will be skipped because the transaction was not commited yet
13:09:26.894 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Util - closing TransactedConnection [con=org.davidmoten.rx.jdbc.pool.internal.PooledConnection@6d78bf70, counter=1]
13:09:26.895 [reactor-http-nio-2] DEBUG o.d.rx.jdbc.TransactedConnection - TransactedConnection attempt close
13:09:26.895 [reactor-http-nio-2] DEBUG org.davidmoten.rx.jdbc.Util - closing org.davidmoten.rx.jdbc.TransactedResultSet@2bdbe545
-- commit the tx
13:09:26.896 [reactor-http-nio-2] DEBUG o.d.rx.jdbc.TransactedConnection - TransactedConnection commit attempt, counter=1
13:09:26.896 [reactor-http-nio-2] DEBUG o.d.rx.jdbc.TransactedConnection - TransactedConnection actual commit
-- And finally successfully close the connection 
13:09:26.897 [reactor-http-nio-2] DEBUG o.d.rx.jdbc.TransactedConnection - TransactedConnection attempt close
13:09:26.897 [reactor-http-nio-2] DEBUG o.d.rx.jdbc.TransactedConnection - TransactedConnection close

Inserting NULL values

How should NULL values be inserted into nullable columns?
When I pass a Java null via the parameters call I get

java.lang.NullPointerException: Iterator.next() returned a null value 

because a null appears in a flowable stream (note that this is a stream of query parameters, not a query result set).
When I wrap a null with Optional.ofNullable I get

org.postgresql.util.PSQLException: Can't infer the SQL type to use for an instance of java.util.Optional. Use setObject() with an explicit Types value to specify the type to use.

Cannot get live stream of objects using repeat() and autoMap()

Hi david
The automap feature works, thank you for the fix; sorry to be a pain but i am unable to retrieve live objects produced in the database via this code snippet:

db.select( "SELECT RecIndex,Site FROM dbo.MEAS_CARRIER where Time >= ? ORDER BY RecIndex ASC") .parameterStream(Flowable.just(Instant.now()).delay(1, TimeUnit.SECONDS).repeat())
.autoMap(CarrierMeasuremnt.class).distinct();

As you can see i am passing Instant.now() into the flowable and i am repeating it, so i should expect a live stream objects being produced as is the case if i use the get getAs(InputStream.class) function instead of autoMap().
I believe the issue is to with the equals and hashcode methods of the ProxyInstance? Can you please clarify this?

Enhance non-blocking connection pool

Just to mention I'll rewrite portions of the MemberSingle drain loop and NonBlockingConnectionPool to maximize non-blocking. At the moment establishment of a connection blocks the drain loop. It should be scheduled with a subsequent drain call. Might get to release stage in a week or two.

Error when using Database.nonBlocking()

Hello David,

I'm trying to use the Database.nonBlocking().connectionProvider(dataSource).build() but I keep getting this Exception org.davidmoten.rx.pool.PoolClosedException: null.

If I switch to Database.fromBlocking(dataSource) it works fine.

This is my db call

import javax.sql.DataSource;
import org.davidmoten.rx.jdbc.Database;
import org.davidmoten.rx.jdbc.ResultSetMapper;

try (Database db = Database.fromBlocking(dataSource)) {
            return db
                    .select(query)
                    .parameter("param1", param1)
                    .parameter("param2", param2)
                    .parameter("param3", param3)
                    .parameter("param4", param4)
                    .get(new MyObjectMapper());
        }

As always thank you very much for your time!

MySQL connection 8 hours timeout issue

MySQL close idle connections after 8 hours. I have 2 questions regarding this.

  1. With the nonBlocking pool, if I specify healthCheck and idleTimeBeforeHealthCheck, does it mean it will keep the connections alive? for example
Pools.nonBlocking()
                        .connectionProvider(ConnectionProvider.from(url, username, password))
                        .healthCheck(DatabaseType.MYSQL)
                        .idleTimeBeforeHealthCheck(5, TimeUnit.SECONDS)
                        .build();
  1. If I use the blocking pool and pass spring boot auto configured dataSource, will it keep the connection alive? for example:
@Configuration
public class DataSourceConfig {

    @Bean
    public Database db(DataSource ds){
        return Database.fromBlocking(ds);
    }
}

in application.properties

spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://...
spring.datasource.username=username
spring.datasource.password=password

When using NonBlockingConnectionPool the ExecutorService is not shutdown on close

When using Database.nonBlocking() and not providing a custom scheduler the ExecutorService created is never shutdown. This will prevent JVM shutdown. The responsibility to close it lies in the library because it is created there. It is also impossible to access it through getters from the outside so closing it by hand is only possible using some weird reflection.

val db = Database.nonBlocking().build()
// use at least one connection
db.close()
// ExecutorService still running and never stopped

Java 9 TODO Automapped interface

This is not an issue, but had no idea where to drop suggestions

Automapped interface with default methods currentluy does not work in Java 9

In order to work in Java 9, folowing changes should be made :

in org.davidmoten.rx.jdbc.Util#ProxyService

class add additional method

@SuppressWarnings("unchecked")
public T newInstance(ResultSet rs, Class cls) {
return (T) Proxy.newProxyInstance(cls.getClassLoader(), new Class[] { cls }, new ProxyInstance(cls, values()));
}

in org.davidmoten.rx.jdbc.Util class

alter original method that works with java8

static T autoMap(ResultSet rs, Class cls, ProxyService proxyService) {
return proxyService.newInstance();
}

to a modified version that works with java 9

static T autoMap(ResultSet rs, Class cls, ProxyService proxyService) {
return proxyService.newInstance(rs, cls);
}

Testing example that worked in Java 9

Database
.test()
.select(Person.class)
.get()
.map(Person::name)
.blockingForEach(System.out::println);

Not related to this example - another suggestion - make annotation work with sql field aliases
I would be happy to share more ideas for this great great api.

Provide API to set java.sql.Array parameters

Array parameters in JDBC need to be set as java.sql.Array, which need to be instantiated via the java.sql.Connection.

The rxjava2-jdbc API does not allow access to the Connection (or PreparedStatement) when setting paramaters, so arrays cannot be used as parameters.

This could (not actually sure) be implemented wrapping the current parameter logic into some sort of Consumer<PreparedStatement> to be run before executing the query.
@davidmoten I can investigate and send a PR if you are interested in this feature (I'd rather not waste my time if you are not).

Callable Statements null support on input parameters

How can I insert some null parameter through .input(Object...) method call?

The Workings with nulls section doesn't provide an example with Callable Statements.

I'm trying some approaches but nothing works :(

auto mapping with clob data types not working

written in Kotlin

Using this interface to auto map this database structure

interface ExportInfoMapper {

@Column("id")
fun id(): Long?

@Column("userid")
fun userId(): String?

@Column("created")
fun created(): Date?

@Column("modified")
fun modified(): Date?

@Column("raw")
fun raw(): String?

@Column("data_source")
fun dataSource(): String?

}

create table exporter_info (
id bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) primary key,
userid varchar(50) not null,
modified date not null DEFAULT CURRENT_DATE,
created date not null DEFAULT CURRENT_DATE,
raw clob,
data_source clob
);
insert into exporter_info(userid, raw, data_source) values('testUser','{"example": "not sure" }', '{"example": "not sure" }');
insert into exporter_info(userid, raw, data_source) values('testUser','{"example": "not sure" }', '{"example": "not sure" }');
insert into exporter_info(userid, raw, data_source) values('testUser','{"example": "not sure" }', '{"example": "not sure" }');

fails on this expression

    Database
            .test()
            .select("select * from exporter_info")
            .autoMap(ExportInfoMapper::class.java)
            .map {
                println(it)
                it.userId() }
            .test()
            .awaitDone(20, TimeUnit.SECONDS)
            .assertValues("testUser", "testUser", "testUser")
            .assertComplete()

Error receiving

Caused by: ERROR XCL18: Stream or LOB value cannot be retrieved more than once

Is there something I am doing wrong???

Pools insufficient demand to initialize

Hello,

We're having an odd issue when using the pool (see debug output below).

After awhile we start to see database queries just not running (VisualVM confirms no JDBC activity other than the database ping. It looks as though the pool is getting stuck but we're really not sure how/why. It does appear to recover after a period of time

Currently trying to reproduce this in a small simple test

This is the output we get when the database activity stops

09:51:44.065 [mcl-4] DEBUG org.davidmoten.rx.pool.MemberSingle - subscribed
09:51:44.065 [mcl-4] DEBUG org.davidmoten.rx.pool.MemberSingle - drain called
09:51:44.065 [mcl-4] DEBUG org.davidmoten.rx.pool.MemberSingle - drain loop starting
09:51:44.065 [mcl-4] DEBUG org.davidmoten.rx.pool.MemberSingle - requested=2
09:51:44.065 [mcl-4] DEBUG org.davidmoten.rx.pool.MemberSingle - poll of available members returns null
09:51:44.065 [mcl-4] DEBUG org.davidmoten.rx.pool.MemberSingle - scheduling member creation
09:51:44.065 [mcl-4] DEBUG org.davidmoten.rx.pool.MemberSingle - poll of available members returns null
09:51:44.065 [mcl-4] DEBUG org.davidmoten.rx.pool.MemberSingle - insufficient demand to initialize DecoratingMember [value=null]

What we normally get when working

09:51:44.065 [mcl-1] DEBUG org.davidmoten.rx.pool.MemberSingle - subscribed
09:51:44.065 [mcl-1] DEBUG org.davidmoten.rx.pool.MemberSingle - drain called
09:51:44.065 [mcl-1] DEBUG org.davidmoten.rx.pool.MemberSingle - drain loop starting
09:51:44.065 [mcl-1] DEBUG org.davidmoten.rx.pool.MemberSingle - requested=1
09:51:44.065 [mcl-1] DEBUG org.davidmoten.rx.pool.MemberSingle - poll of available members returns DecoratingMember [value=conn21: url=jdbc:h2:tcp://localhost:9123/Primary-SCDBSIM user=SA]
09:51:44.065 [mcl-1] DEBUG org.davidmoten.rx.pool.MemberSingle - trying to emit member
09:51:44.065 [mcl-1] DEBUG org.davidmoten.rx.pool.MemberSingle - schedule.now=1521193904065, lastCheck=1521193904060
09:51:44.065 [mcl-1] DEBUG org.davidmoten.rx.pool.MemberSingle - no health check required for DecoratingMember [value=conn21: url=jdbc:h2:tcp://localhost:9123/dbname user=SA]

Pool configuration

Pools.nonBlocking().url(url).scheduler(Schedulers.from(executorService))
                .maxIdleTime(0, TimeUnit.MINUTES).healthCheck(c -> {
                    try {
                        return c.prepareStatement("select 1").execute();
                    } catch (SQLException e) {
                        LOG.debug("SQLException Occurred whilst invoking a health check on the database", e);
                    }
                    return false;
                }).idleTimeBeforeHealthCheck(1, TimeUnit.SECONDS).maxPoolSize(100).build();

I have tried adjusting the above idle times and pool size but it doesn't appear to have any effect.

The executor service has 10 threads.

Any help/thoughts would be appreciated. We'll keep working to get you a test case for it as well.

Note we are using 0.1-rc19, so going to grab the latest version and test it again..

way to db.update() in a single transaction taking parameterStream() input from another query result

Is there any way to to db.update() in a single transaction taking parameterStream() input from another query result.
This fails in my case:

        try (final Database db = Database.test()) {
            final Flowable<String> comments = db
                .select("select name from person")
                .getAs(String.class)
                .flatMap(n -> Flowable.range(0, 10).map(i -> String.format("comment %s by %s", i, n)));

            db.update("insert into note(text) values(?)")
                .parameterStream(comments)
                .transacted()
                .transactedValuesOnly()
                .counts()
                .blockingForEach(System.out::println);
        }

with the following output:

13:13:48.019 [pool-1-thread-3] DEBUG o.d.rx.jdbc.TransactedConnection - TransactedConnection attempt close
13:13:48.021 [main] DEBUG org.davidmoten.rx.jdbc.Util - ignored exception Cannot close a connection while a transaction is still active., class java.sql.SQLException, {}
java.sql.SQLException: Cannot close a connection while a transaction is still active.
	at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
	at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
	at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
	at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
	at org.apache.derby.impl.jdbc.EmbedConnection.newSQLException(Unknown Source)
	at org.apache.derby.impl.jdbc.EmbedConnection.checkForTransactionInProgress(Unknown Source)
	at org.apache.derby.impl.jdbc.EmbedConnection.close(Unknown Source)
	at org.davidmoten.rx.jdbc.Util.closeSilently(Util.java:264)
	at org.davidmoten.rx.pool.DecoratingMember.disposeValue(DecoratingMember.java:75)
	at org.davidmoten.rx.pool.MemberSingle.disposeValues(MemberSingle.java:398)
	at org.davidmoten.rx.pool.MemberSingle.disposeAll(MemberSingle.java:391)
	at org.davidmoten.rx.pool.MemberSingle.cancel(MemberSingle.java:140)
	at org.davidmoten.rx.pool.MemberSingle.close(MemberSingle.java:384)
	at org.davidmoten.rx.pool.NonBlockingPool.close(NonBlockingPool.java:93)
	at org.davidmoten.rx.jdbc.pool.NonBlockingConnectionPool.close(NonBlockingConnectionPool.java:336)
	at org.davidmoten.rx.jdbc.Database.lambda$from$4(Database.java:72)
	at org.davidmoten.rx.jdbc.Database.close(Database.java:198)
	at TransactionTest.testSelectTransacted(TransactionTest.java:24)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.apache.derby.iapi.error.StandardException: Cannot close a connection while a transaction is still active.
	at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
	at org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown Source)
	... 40 common frames omitted

automap function on SelectBuilder object does not work

db.select( "SELECT RecIndex,Site FROM dbo.MEAS_CARRIER where Time >= ? ORDER BY RecIndex ASC") .parameterStream(Flowable.just(Instant.now().minus(Duration.ofHours(1).plus(Duration.ofMinutes(2))))
.delay(1, TimeUnit.SECONDS).repeat())
.autoMap(CarrierMeasuremnt.class).distinct();

Your automap function returns a Flowable, and when you subscribe to the Flowable the value is null!

Can you please help?

SQL "IN" Clause

Is there any support for the SQL IN clause?
Database.test() .select("select score from person where name IN (?)") .parameters("FRED", "JOSEPH") // or a list of Strings .getAs(Integer.class) .blockingForEach(System.out::println);

I tried a bunch of different ways and I couldn't figure how I could possibly make it work.

Thank you in advance for your time!

Observable database is closed but flowable is ok

Using: com.github.davidmoten:rxjava2-jdbc:0.1-RC8
with org.xerial:sqlite-jdbc:3.20.0

Given this code:

  private final Database dataSource;
  ...
  Connection connection = DriverManager.getConnection("jdbc:sqlite:" + fileLocation)
  datasource = Database.from(Single.just(connection)...
  ...
  private Observable<VectorTileConfig> queryConfig() {
      return queryConfigFlowable().toObservable();
  }

  private Flowable<VectorTileConfig> queryConfigFlowable() {
    final URL url = Resources.getResource("metadata_raw.sql");
    String query;
    try {
      query = Resources.toString(url, Charsets.UTF_8);
    } catch (final IOException ex) {
      return Flowable.error(ex);
    }
    return dataSource.select(query)
        .get(rs -> new VectorTileConfig(rs.getInt("min_zoom"), rs.getInt("max_zoom"),
            rs.getInt("max_zoom_minx"), rs.getInt("max_zoom_miny"), rs.getInt("max_zoom_maxx"),
            rs.getInt("max_zoom_maxy")));
  }

I note that reusing the database/datasource has different behaviour. As an observable, the second query results in:

Caused by: java.sql.SQLException: database connection closed
at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:163)
at io.reactivex.observers.BaseTestConsumer.assertValue(BaseTestConsumer.java:328)
at uk.os.vt.mbtiles.StorageImplCannedDataTest.testMultipleZoomLevelMaxZoom(StorageImplCannedDataTest.java:178)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.sql.SQLException: database connection closed
at org.sqlite.core.CoreConnection.checkOpen(CoreConnection.java:336)
at org.sqlite.jdbc4.JDBC4Connection.prepareStatement(JDBC4Connection.java:45)
at org.sqlite.jdbc3.JDBC3Connection.prepareStatement(JDBC3Connection.java:263)
at org.davidmoten.rx.jdbc.Util.prepare(Util.java:293)
at org.davidmoten.rx.jdbc.Select.lambda$create$1(Select.java:39)
at io.reactivex.internal.operators.flowable.FlowableUsing.subscribeActual(FlowableUsing.java:49)
at io.reactivex.Flowable.subscribe(Flowable.java:12994)
at io.reactivex.Flowable.subscribe(Flowable.java:12940)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:133)
at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:63)
at io.reactivex.internal.operators.single.SingleJust.subscribeActual(SingleJust.java:30)
at io.reactivex.Single.subscribe(Single.java:2779)
at io.reactivex.internal.operators.single.SingleToFlowable.subscribeActual(SingleToFlowable.java:37)
at io.reactivex.Flowable.subscribe(Flowable.java:12994)
at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
at io.reactivex.Flowable.subscribe(Flowable.java:12994)
at io.reactivex.Flowable.subscribe(Flowable.java:12940)
at io.reactivex.internal.operators.observable.ObservableFromPublisher.subscribeActual(ObservableFromPublisher.java:31)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at uk.os.vt.mbtiles.StorageImplCannedDataTest.testMultipleZoomLevelMaxZoom(StorageImplCannedDataTest.java:176)
... 22 more

When using the Flowable I can execute multiple queries without issue.

Let me know if you'd like a PR with failing test...or if I have missed something obvious. Thanks.

Provide RxJava2-JDBC on top of R2DBC

This ticket probably defeats its creation as this project is also about JDBC.

Have you considered providing this library on top of a truly reactive database integration such as R2DBC? RxJava2-JDBC comes with a decent API for database access, however, JDBC is blocking and JDBC on ThreadPools comes with severe scalability limitations when talking about non-blocking reactive programming.

You already have all you need for proper non-blocking data access except the transport-layer. R2DBC is based on Reactive Streams. With RxJava2's Flowable, there seems be missing just a to adoption.

Having a tough time with Tx

Hello there!

I feel that I'm over complicating over, possibly overkilling it... but I need the following operations to be within a transaction:

  1. Update parent resource
  2. Delete orphan child resource
  3. Update child resource
  4. Insert new child resource
  5. Fetch just updated parent resource
  6. Fetch all child resources

My code looks like this so far...

        try (Database database = Database.fromBlocking(dataSource)) {
            return database.update(updateParentResourceQuery)
                    .parameter()
                    .transacted()
                    .counts()
                    .flatMap(tx -> {
                        return tx.update(deleteChildResource)
                                .parameter()
                                .counts()
                                .flatMap(tx2 -> {
                                    TransactedUpdateBuilder createChildResource = tx2.update(createChildResourceQuery);
                                    childResourcesToBeCreated.forEach(entityLanguage -> createChildResource
                                            .parameter());
                                    return createChildResource
                                            .counts()
                                            .flatMap(tx3 -> {
                                                TransactedUpdateBuilder updateChildResource = tx3.update(updateChildResourceQuery);
                                                childResourcesToBeUpdated.forEach(
                                                        entityLanguage -> updateChildResource
                                                                .parameter());
                                                return updateChildResource
                                                        .counts()
                                                        .flatMap(tx4 -> {
                                                            return tx4.select(selectParentResource)
                                                                    .parameter()
                                                                    .transactedValuesOnly()
                                                                    .get(new Mapper())
                                                                    .flatMap(tx5 -> {
                                                                        return tx5
                                                                                .select(selectAllChildResources)
                                                                                .parameter()
                                                                                .valuesOnly()
                                                                                .get(new Mapper())
                                                                                .flatMap(childResources -> {
                                                                                    ParentResource parentResource = tx5.value();
                                                                                    parentResource.setChildResources(childResources);
                                                                                    return Flowable.just(parentResource);
                                                                                });
                                                                    });
                                                        });
                                            });
                                });
                    });
        }

It doesn't work. When I do the parent update and fetch it back it works, but when I start chaining the operations everything goes haywire.

Any input will be much much appreciated!

Thank you very much for your time!

Automapping enums properties

Is there a way for automapping a string to enums on select and enums to string when using as query parameter?

SQLException: Operation not allowed after ResultSet closed

===> hi, every one, hi david, does anyone encounter these problem? i am desperate!
===> i want to iterate my db records from serveral tables.

===> i have the version:

rxjava2-jdbc:0.2.1
mysql-connector-java:8.0.13

===> and some of my code looks like this:

<userService.java>:
`userRepository.findAll().flatMap(new Function<User, Publisher<Tuple4<User, Shop, Company, Flux>>>() {

@OverRide
public Publisher<Tuple4<User, Shop, Company, Flux>> apply(User user) {
return Mono.zip(Mono.just(user), shopRepository.findByUserId(user.id()), companyRepository.findByUserId(user.id()), Mono.just(productRepository.findByUserId(user.id())));
}

}).subscribe(new Subscriber<Tuple4<User, Shop, Company, Flux>>() {
private Subscription subscription;
private int onNextAmount;

@OverRide
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(5);
}

@OverRide
public void onNext(Tuple4<User, Shop, Company, Flux> objects) {
// do something...
onNextAmount++;
if (onNextAmount % 5 == 0) {
this.subscription.request(5);
}
}

@OverRide
public void onError(Throwable throwable) {
}

@OverRide
public void onComplete() {
}
});`

<userRepository.java>:
public Flux findAll(Integer num) {
String sql = SELECT + "where isdelete = 0 order by id";
return Flux.from(database.select(sql).autoMap(User.class));
}

<shopRepository.java>:
public Mono findByUserId(Long userId) {
String sql = SELECT + "where user_id = ? and isdelete = 0";
return Mono.from(database.select(sql).parameter(userId).autoMap(Shop.class));
}

===> database config:
database = Database.nonBlocking()
.url(url)
.user(user)
.password(password)
.healthCheck(DatabaseType.MYSQL)
.maxPoolSize(maxPoolSize)
.build();

===> but i got these errors, sometimes is "ResultSet is from UPDATE. No Data", sometimes is "Operation not allowed after ResultSet closed", the error will occur occasionally, but rarely.

Caused by: java.sql.SQLException: ResultSet is from UPDATE. No Data.
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
at com.mysql.cj.jdbc.result.ResultSetImpl.next(ResultSetImpl.java:1813)
at org.davidmoten.rx.jdbc.Select.lambda$create$6(Select.java:77)
at io.reactivex.internal.operators.flowable.FlowableInternalHelper$SimpleBiGenerator.apply(FlowableInternalHelper.java:62)
at io.reactivex.internal.operators.flowable.FlowableInternalHelper$SimpleBiGenerator.apply(FlowableInternalHelper.java:53)
at io.reactivex.internal.operators.flowable.FlowableGenerate$GeneratorSubscription.request(FlowableGenerate.java:109)

===> following is another error.

Caused by: java.sql.SQLException: Operation not allowed after ResultSet closed
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
at com.mysql.cj.jdbc.result.ResultSetImpl.checkClosed(ResultSetImpl.java:470)
at com.mysql.cj.jdbc.result.ResultSetImpl.next(ResultSetImpl.java:1808)
at org.davidmoten.rx.jdbc.Select.lambda$create$6(Select.java:77)
at io.reactivex.internal.operators.flowable.FlowableInternalHelper$SimpleBiGenerator.apply(FlowableInternalHelper.java:62)
at io.reactivex.internal.operators.flowable.FlowableInternalHelper$SimpleBiGenerator.apply(FlowableInternalHelper.java:53)
at io.reactivex.internal.operators.flowable.FlowableGenerate$GeneratorSubscription.request(FlowableGenerate.java:109)
... 44 more

Enhancement of tx : tx.select(Class<?> cls)

I propose this enhancement, in a transacted select , with possibility of using annotated interface...
like
.flatMap(tx -> tx.select(Person.class)
instead of
.flatMap(tx -> tx.select("select * from person")
I put it on test in the example bellow and also I am displaying what I've been altering to your code in order for this to work :
Database.test()
.select(Person.class)
.parameter(2)
.transactedValuesOnly()
.get()
.flatMap(tx -> tx.update("update person set name=:name where id=:id")
.parameter("name", "blabla")
.parameter("id", tx.value().id())
.transactedValuesOnly()
.counts()
)
.flatMap(tx -> tx.select(Person.class)
.parameter(2)
.transactedValuesOnly()
.valuesOnly()
.get()
)
.subscribe(
v -> {out.println(v.name());},
e -> out.println(e.getMessage())
);

Here are the modifications I've made to your last version of code (maven 0.2.0 version - thanks for quick publishing btw .. I appreciate it )

in class : org.davidmoten.rx.jdbc.TxWithoutValue
add method :
<T> TransactedSelectAutomappedBuilder<T> select(Class<T> cls);
in class : org.davidmoten.rx.jdbc.TxImpl
add method :
@Override
public <T> TransactedSelectAutomappedBuilder<T> select(Class<T> cls) {
return db.tx(this).select(cls);
}

in class : org.davidmoten.rx.jdbc.TransactedBuilder
add method :
public <T> TransactedSelectAutomappedBuilder<T> select(@Nonnull Class<T> cls) {
Preconditions.checkNotNull(cls, "cls cannot be null");
return new SelectAutomappedBuilder(cls, connections, db).transacted();
}

I am not sure if is error free, I am not familiar with all the options this library has, but if you could look into it would be just great.

Transacted part of the stream seem to affect non transacted parts

Hi @davidmoten

Being pleased with rxjava-jdbc I've been doing my latest app using rxjava2-jdbc. Thanks for bringing this to us! As always it's really helpful.

However, I've hit some issue when using transactions. I have the following code:

	@Override
	public Completable insertNewEntry(int id1, String id2, int id3, boolean active) {
		return db.update(INSERT_NEW_ENTRY)
			.parameters(id1, id2, id3, active)
			.transaction()
			.flatMapCompletable(tx -> tx.update(UPDATE_ITEM_REF)
				.parameters(id2, id1)
				.countsOnly()
				.ignoreElements());
	}

	@Override
	public Completable updateEntry(int id1, Date lastSyncDate, boolean active) {
		return db.update(UPDATE_ENTRY)
			.parameters(lastSyncDate, active, id1)
			.complete();
	}

Those methods are chained using compose and flatMaps in processing "pipeline" which goal is to keep data synced between multiple services.

My stream ends with a retry/repeat to make "daemon-like".

	/**
	 * Handles repeat and retry
	 */
	Flowable<?> handleRe(Flowable<?> f) {
		AtomicLong lastSubscribe = new AtomicLong();

		Function<? super Flowable<?>, ? extends Publisher<?>> re = o -> o
			.map(c -> {
				long sinceLasteRun = System.currentTimeMillis() - lastSubscribe.get();
				long nextRunIn = config.delayBetweenRunsMs() - sinceLasteRun;
				return nextRunIn;
			})
			.delay(l -> {
				Flowable<Object> s = Flowable.just(1);
				return l > 0 ? s.delay(l, TimeUnit.MILLISECONDS) : s;
			});

		return f
			.doOnSubscribe(s -> lastSubscribe.set(System.currentTimeMillis()))
			.doOnError(t -> L.warn("It went south!", t))
			.repeatWhen(re)
			.retryWhen(re);
	}

The thing is, updateEntry never affects the DB when insertNewEntry has been run. However if the entry is already inserted (after a repeat) and we only need to update, it works fine.

So I tried removing the transaction:

	@Override
	public Completable insertNewEntry(int id1, String id2, int id3, boolean active) {
		return db.update(INSERT_NEW_ENTRY)
			.parameters(id1, id2, id3, active)
			.counts()
			.flatMapCompletable(tx -> db.update(UPDATE_ITEM_REF)
				.parameters(id2, id1)
				.complete());
	}

And now updateEntry works one the first run when insertNewEntry is ran before.
Any idea what may be going on ?

Thanks alot !

This annotation is not applicable to target 'member property without backing field or delegate'

adsiz

apply plugin: 'java'
apply plugin: 'kotlin'
apply plugin: 'kotlin-kapt'

group 'org.example'
version '1.0-SNAPSHOT'

buildscript {
    ext.kotlin_version = '1.2.0'

    repositories {
        mavenCentral()
        jcenter()
    }

    dependencies {
        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
    }
}

sourceCompatibility = 1.8

repositories {
    mavenCentral()
    jcenter()
}

sourceSets {
    main.java.srcDirs += 'src/main/kotlin'
}

kapt {
    generateStubs = true
}

dependencies {
    implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
    testCompile group: 'junit', name: 'junit', version: '4.12'

    implementation 'com.google.code.gson:gson:2.8.2'
    implementation 'com.google.dagger:dagger:2.4'
    kapt 'com.google.dagger:dagger-compiler:2.4'

    implementation 'com.github.davidmoten:rxjava2-jdbc:0.1-RC23'
    implementation 'ch.qos.logback:logback-classic:1.2.3'

    implementation 'org.xerial:sqlite-jdbc:3.21.0.1'
}

compileKotlin { kotlinOptions.jvmTarget = "1.8" }
compileTestKotlin { kotlinOptions.jvmTarget = "1.8" }

Same NamedParameters used multiple times

MySQL

String query = "SELECT * FROM table
WHERE ST_CONTAINS(GEOMFROMTEXT('POLYGON((:neLon :neLat, :neLon :swLat, :swLon :swLat, :swLon :neLat,:neLon :neLat))'), point)"
return database
                    .select(query)
                    .parameter("swLon", swLon)
                    .parameter("swLat", swLat)
                    .parameter("neLon", neLon)
                    .parameter("neLat", neLat)
                    .get(new Mapper())

This won't work but when I hardcode the lat's and lng's it works fine. Are we not allowed to use named parameters in multiple places and assign value only once?

Thank you very much for all your help!

Kind Regards,
Vini

H2: The object is already closed [90007-197]

I'm trying to use the library with H2 and I'm finding that the connections get closed after a while, resulting in stacktraces like this:

org.h2.jdbc.JdbcSQLException: The object is already closed [90007-197]
at org.h2.message.DbException.getJdbcSQLException(DbException.java:357)
at org.h2.message.DbException.get(DbException.java:179)
at org.h2.message.DbException.get(DbException.java:155)
at org.h2.message.DbException.get(DbException.java:144)
at org.h2.jdbc.JdbcConnection.checkClosed(JdbcConnection.java:1523)
at org.h2.jdbc.JdbcConnection.checkClosed(JdbcConnection.java:1502)
at org.h2.jdbc.JdbcConnection.prepareStatement(JdbcConnection.java:692)
at org.davidmoten.rx.jdbc.pool.internal.PooledConnection.prepareStatement(PooledConnection.java:57)

I have a couple of Repositories with code like this:

@Component
public class CommandRepository {

  @Autowired private ApplicationProperties applicationProperties;

  private Database db;

  @PostConstruct
  public void connect() throws Exception {
    Connection connection =
        DriverManager.getConnection(
            applicationProperties.getDatasource().getUrl(),
            applicationProperties.getDatasource().getUsername(),
            applicationProperties.getDatasource().getPassword());
    NonBlockingConnectionPool pool =
        Pools.nonBlocking()
            .maxPoolSize(Runtime.getRuntime().availableProcessors() * 5)
            .connectionProvider(ConnectionProvider.from(connection))
            .healthCheck(DatabaseType.H2)
            .build();

    this.db = Database.from(pool);

    createTables();
  }

  private void createTables() {
    String commandPrefix = "CREATE TABLE COMMAND(";

    String createTablePostfix =
        "ID VARCHAR(50) NOT NULL, " + "JSON VARCHAR(5000) NOT NULL, " + "PRIMARY KEY (ID) " + ")";

    db.update(commandPrefix + createTablePostfix).counts().blockingForEach(System.out::println);
  }

  public Flux<Message> findAll() {
    return Flux.from(
        db.select("select * from command")
            .get(
                rs -> {
                  Message message = new Message();
                  message.setId(rs.getString("id"));
                  message.setJson(rs.getString("json"));

                  return message;
                }));
  }

  public Mono<Integer> save(Mono<Message> commandMono) {
    String createSql = "INSERT INTO command (id, json) VALUES (?, ?)";

    return commandMono.flatMap(
        c -> {
          return Mono.from(db.update(createSql).parameters(c.getId(), c.getJson()).counts());
        });
  }

  @PreDestroy
  public void shutdown() {
    db.update("drop table command").complete().blockingAwait();
  }
}

Am I doing something wrong?

Connection Timeout not caught by doOnError()

Hello, i have a problem with handling ConnectionTimeout. Look at this code:

RxJava2Adapter.flowableToFlux(dao.getAll().doOnError(e->log.error("THIS IS AN ERROR")))
                .subscribeOn(elastic).log()
                .publishOn(parallel).log()
                .publish();

where dao.getAll() call is:

public Flowable<Transit> getAll(){
Database db = Database.nonBlocking().connectionProvider(db2Config.dataSource()).build();
return db.select("select * from TABLE_A where ID=:id")
                .parameter("id","20160512205855_8203501_299566")
                .autoMap(Transit.class);
    }

Now, if there is a connection timeout, no doOnError will catch the exception. After some debugging i noticed that the error is captured by RxJava2 default error handler.

There is a way to propagate the onError signal up to our stream?

Thank you,

Transaction is not committing

I have a function that executes 2 updates in a transaction. After running the code, I don't see any change in the db. I suspect the transaction is not committed. I'm sure there is no error with the queries since doOnError is not fired. Removing transaction would work. Any idea why transaction not working?

below is my code

public Mono<Integer> update() {
        Flowable<Integer> updates = db.update("UPDATE ...")
                .parameters(...)
                .transacted()
                .counts()
                .doOnError(e-> {
                                LOGGER.debug(e);
               })
                .flatMap(tx -> {
                    return tx.update("UPDATE ...")
                            .parameters(...)
                            .counts()
                            .doOnError(e-> {
                                LOGGER.debug(e);
                            });
                });

        return Mono.from(updates);
}

below is the log

Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.jdbc.SqlInfo][] - sqlAfterSubs=<UPDATE QUERY>
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.jdbc.TransactedUpdateBuilder][] - creating deferred flowable
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - subscribed
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain called
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain loop starting
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - requested=1
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - poll of available members returns DecoratingMember [value=com.mysql.cj.jdbc.ConnectionImpl@65d2b0cb]
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - trying to emit member
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - schedule.now=1557250484152, lastCheck=1557250476191
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - queueing member for health check DecoratingMember [value=com.mysql.cj.jdbc.ConnectionImpl@65d2b0cb]
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - scheduling check of DecoratingMember [value=com.mysql.cj.jdbc.ConnectionImpl@65d2b0cb]
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - poll of available members returns null
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - scheduling member creation
Thread: pool-2-thread-4] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - performing health check on DecoratingMember [value=com.mysql.cj.jdbc.ConnectionImpl@65d2b0cb]
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - poll of available members returns null
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - insufficient demand to initialize DecoratingMember [value=null]
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain called
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain loop starting
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - requested=1
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - poll of available members returns null
Thread: pool-2-thread-3] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - insufficient demand to initialize DecoratingMember [value=null]
Thread: pool-2-thread-5] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - creating value
Thread: pool-2-thread-4] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain called
Thread: pool-2-thread-4] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain loop starting
Thread: pool-2-thread-4] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - requested=1
Thread: pool-2-thread-4] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - poll of available members returns DecoratingMember [value=com.mysql.cj.jdbc.ConnectionImpl@65d2b0cb]
Thread: pool-2-thread-4] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - trying to emit member
Thread: pool-2-thread-4] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - schedule.now=1557250484155, lastCheck=1557250484155
Thread: pool-2-thread-4] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - no health check required for DecoratingMember [value=com.mysql.cj.jdbc.ConnectionImpl@65d2b0cb]
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Util][] - creating new TransactedConnection
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.TransactedConnection][] - constructing TransactedConnection from org.davidmoten.rx.jdbc.pool.internal.PooledConnection@140a7cbd, 1
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Update][] - <UPDATE QUERY>
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.SqlInfo][] - sqlAfterSubs=<UPDATE QUERY>
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Util][] - preparing statement: <UPDATE QUERY>
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Util][] - setting parameter ...
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Update][] - batch added with [...]
Thread: pool-2-thread-5] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - checking in DecoratingMember [value=com.mysql.cj.jdbc.ConnectionImpl@779e8a73]
Thread: pool-2-thread-5] DEBUG [org.davidmoten.rx.pool.DecoratingMember][] - scheduled release in 1800000ms of DecoratingMember [value=com.mysql.cj.jdbc.ConnectionImpl@779e8a73]
Thread: pool-2-thread-5] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain called
Thread: pool-2-thread-5] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain loop starting
Thread: pool-2-thread-5] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - requested=0
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Update][] - batch executed
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.TransactedConnection][] - forking connection
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.TransactedConnection][] - constructing TransactedConnection from org.davidmoten.rx.jdbc.pool.internal.PooledConnection@140a7cbd, 3
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.SqlInfo][] - sqlAfterSubs=<UPDATE QUERY 2>
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.TransactedUpdateBuilder][] - creating deferred flowable
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Update][] - <UPDATE QUERY 2>
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.SqlInfo][] - sqlAfterSubs=<UPDATE QUERY 2>
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Util][] - preparing statement: <UPDATE QUERY 2>
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Util][] - setting parameter ...
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Update][] - batch added with [...]
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Update][] - batch executed
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain called
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - drain loop starting
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.pool.MemberSingle][] - requested=0
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Util][] - closing org.davidmoten.rx.jdbc.TransactedPreparedStatement@53da613f
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Util][] - closing TransactedConnection [con=org.davidmoten.rx.jdbc.pool.internal.PooledConnection@140a7cbd, counter=4]
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.TransactedConnection][] - TransactedConnection attempt close
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Util][] - closing org.davidmoten.rx.jdbc.TransactedPreparedStatement@5340c1b2
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.Util][] - closing TransactedConnection [con=org.davidmoten.rx.jdbc.pool.internal.PooledConnection@140a7cbd, counter=4]
Thread: pool-2-thread-6] DEBUG [org.davidmoten.rx.jdbc.TransactedConnection][] - TransactedConnection attempt close

subscriber schedular keep alive after complete

hi

i use rxjava2 jdbc, when subscriber terminate the owner schedular keep alive ,i forse to shutdow it with create a schedular type io and i observerOn this schedular, so when subscriber complete task i try to shutdown this schedular but it stay alive !! the shutdown() method can't kill it !!!! how to release resources (schedulars) when subscriber complete ??

concurrency test failure in 0.1-RC2

Concurrency testing threw ArrayIndexOutOfBoundsException as below. Problem has been identified and fix coming soon.

Caused by: java.lang.ArrayIndexOutOfBoundsException: 35
	at org.davidmoten.rx.pool.MemberSingle.emit(MemberSingle.java:130)
	at org.davidmoten.rx.pool.MemberSingle.drain(MemberSingle.java:106)
	at org.davidmoten.rx.pool.MemberSingle.subscribeActual(MemberSingle.java:182)
	at io.reactivex.Single.subscribe(Single.java:2700)
	... 13 more

Default value for the null column?

How can we handle the null value for the data mapping process? I'm getting the encoding exception if the value of the column coming back from SQL database is NULL.

org.springframework.core.codec.EncodingException: JSON encoding error: (was java.lang.NullPointerException); nested exception is com.fasterxml.jackson.databind.JsonMappingException: (was java.lang.NullPointerException) (through reference chain: java.util.ArrayList[0]->com.sun.proxy.$Proxy96["sales_dollars"])
at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:135) ~[spring-web-5.0.0.RELEASE.jar:5.0.0.RELEASE]
at org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$3(AbstractJackson2Encoder.java:108) ~[spring-web-5.0.0.RELEASE.jar:5.0.0.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107) ~[reactor-core-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1085) ~[reactor-core-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:117) ~[reactor-core-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at io.reactivex.internal.util.HalfSerializer.onComplete(HalfSerializer.java:91) ~[rxjava-2.1.7.jar:na]
at io.reactivex.internal.subscribers.StrictSubscriber.onComplete(StrictSubscriber.java:109) ~[rxjava-2.1.7.jar:na]
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:426) ~[rxjava-2.1.7.jar:na]
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366) ~[rxjava-2.1.7.jar:na]
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onComplete(FlowableFlatMap.java:338) ~[rxjava-2.1.7.jar:na]
at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:135) ~[rxjava-2.1.7.jar:na]
at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:63) ~[rxjava-2.1.7.jar:na]
at io.reactivex.internal.operators.single.SingleMap$MapSingleObserver.onSuccess(SingleMap.java:64) ~[rxjava-2.1.7.jar:na]
at org.davidmoten.rx.pool.MemberSingle$Emitter.run(MemberSingle.java:528) ~[rxjava2-jdbc-0.1-RC23.jar:na]
at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:260) ~[rxjava-2.1.7.jar:na]
at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:225) ~[rxjava-2.1.7.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
Caused by: com.fasterxml.jackson.databind.JsonMappingException: (was java.lang.NullPointerException) (through reference chain: java.util.ArrayList[0]->com.sun.proxy.$Proxy96["sales_dollars"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:391) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:351) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:316) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:727) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:107) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:25) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:400) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1392) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ObjectWriter._configAndWriteValue(ObjectWriter.java:1120) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:950) ~[jackson-databind-2.9.1.jar:2.9.1]
at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:129) ~[spring-web-5.0.0.RELEASE.jar:5.0.0.RELEASE]
... 18 common frames omitted
Caused by: java.lang.NullPointerException: null
at com.sun.proxy.$Proxy96.sales_dollars(Unknown Source) ~[na:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_131]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_131]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:687) ~[jackson-databind-2.9.1.jar:2.9.1]
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:719) ~[jackson-databind-2.9.1.jar:2.9.1]
... 28 common frames omitted

Feature Request: hook to get ResultSet before calling next

First things first - thanks for the amazing state of the art software.

I use it to stream large volume of data. It will be awesome to have a hook/callback called at the moment when ResultSet is just obtained. In particular I map ResultSet to CSV. It is all fine if result is not empty however I'd like return just a header (column names) when there the result is empty something like below.

public <T> Flowable<T> get(BiFunction<ResultSet, T> onResultSetIsReady, BiFunction<ResultSet, T> mapper) 

Or may be extends ResultSetMapper?

And use it something like below

sb.select(sql).get(::toHeaderLine, ::toCsvLine): Flowable<String>

Thanks one more time for the amazing software

closed connection issue

I am getting below error intermittently , is there something i am missing

java.sql.SQLException: Closed Connection
	at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:112) ~[ojdbc14-10.2.0.4.0.jar:Oracle JDBC Driver version - "10.2.0.4.0"]
	at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:146) ~[ojdbc14-10.2.0.4.0.jar:Oracle JDBC Driver version - "10.2.0.4.0"]
	at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:208) ~[ojdbc14-10.2.0.4.0.jar:Oracle JDBC Driver version - "10.2.0.4.0"]
	at oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:3544) ~[ojdbc14-10.2.0.4.0.jar:Oracle JDBC Driver version - "10.2.0.4.0"]
	at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3351) ~[ojdbc14-10.2.0.4.0.jar:Oracle JDBC Driver version - "10.2.0.4.0"]
	at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3415) ~[ojdbc14-10.2.0.4.0.jar:Oracle JDBC Driver version - "10.2.0.4.0"]
	at org.davidmoten.rx.jdbc.pool.internal.ConnectionNonBlockingMemberPreparedStatement.executeQuery(ConnectionNonBlockingMemberPreparedStatement.java:55) ~[rxjava2-jdbc-0.1-RC35.jar:na]
	at org.davidmoten.rx.jdbc.Select.lambda$create$5(Select.java:72) ~[rxjava2-jdbc-0.1-RC35.jar:na]
	at io.reactivex.internal.operators.flowable.FlowableGenerate.subscribeActual(FlowableGenerate.java:45) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.Flowable.subscribe(Flowable.java:14419) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.Flowable.subscribe(Flowable.java:14365) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.internal.operators.flowable.FlowableScalarXMap$ScalarXMapFlowable.subscribeActual(FlowableScalarXMap.java:160) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.Flowable.subscribe(Flowable.java:14419) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.Flowable.subscribe(Flowable.java:14365) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.internal.operators.flowable.FlowableUsing.subscribeActual(FlowableUsing.java:74) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.Flowable.subscribe(Flowable.java:14419) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.Flowable.subscribe(Flowable.java:14365) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:133) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:63) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.internal.operators.single.SingleMap$MapSingleObserver.onSuccess(SingleMap.java:64) ~[rxjava-2.1.16.jar:na]
	at org.davidmoten.rx.pool.MemberSingle$Emitter.run(MemberSingle.java:512) ~[rxjava2-pool-0.1-RC35.jar:na]
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:261) ~[rxjava-2.1.16.jar:na]
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:226) ~[rxjava-2.1.16.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_161]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_161]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]

my DB configuration

Class.forName("oracle.jdbc.driver.OracleDriver");
             Connection connection = DriverManager.getConnection(
                     "URL",
                     "user",
                     "pas");
             
            
             NonBlockingConnectionPool pool =
                     Pools.nonBlocking()
                     		
                             .maxPoolSize(Runtime.getRuntime().availableProcessors() * 5)
                             .connectionProvider(ConnectionProvider.from(connection)).healthCheck(DatabaseType.ORACLE).idleTimeBeforeHealthCheck(5, TimeUnit.SECONDS)
                             .createRetryInterval(5, TimeUnit.SECONDS)
                             .build();
             con = connection;
             return Database.from(pool);

Database.from(<DataSource>) has been removed

I'm using rxjava-jdbc and I use Database.from() pretty much everywhere. I'm upgrading to RxJava2 and see that that is no longer possible.

Is this intentional? Is there a straightforward work around?

defaultIfEmpty() possibly buggy...

Hello there!

I'm getting
java.lang.RuntimeException: onNext called with null. Null values are generally not allowed in 2.x operators and sources.
When the select doesn't have any retuned value (null), shouldn't it default to -1 instead of passing on null?

            return database
                    .select("SELECT SUM(amount) FROM table")
                    .parameter("param1", param1)
                    .parameter("param2", param2)
                    .getAs(BigDecimal.class)
                    .defaultIfEmpty(new BigDecimal(-1));

Once again thank you thank you for your time!

Simple insert with transaction is not happening

Trying to execute following insert query does not commit transaction due to transaction reference counter is not decremented.

db.update( "INSERT INTO ADDRESS_REL (ADDRESS_ID, ADDRESS_DETAILS ) VALUES ( ?, ? )")
.parameterStream(Flowable.just(105L, "Address of 105"))
.transacted()
.counts()
.blockingSubscribe(tx -> System.out.println(tx.isComplete() ? "complete" : tx.value()));

Do I missed any hint to transaction ?

returnGeneratedKeys with auto commit enabled

When I use the returnGeneratedKeys method with auto commit enabled then an error is emitted downstream. Namely, the code of this form

db.update("INSERT INTO table1(col1, col2, col3) VALUES(?, ?, ?)")
	.parameters("param1", "param2", "param3")
	.returnGeneratedKeys()
	.autoMap(MyClass.class)
	.singleOrError();

correctly emits an inserted object but the following error is raised on completion:

org.postgresql.util.PSQLException: Cannot commit when autoCommit is enabled.

It looks like a flowable constructed by returnGeneratedKeys performs a commit on completion and is oblivious to the fact that the underlying connection has auto commits enabled. Connection pools with auto commit enabled seem to be the default(?).
Am I doing something wrong?

Bug with MaxIdleTime

When the parameter MaxIdleTime is used, idle connections are properly closed after the given duration. However, it seems they are not properly evicted from the pool (or re-connection does not happen). It results in a random ‘SocketException: Socket closed’ when one of those connections is used later on by the scheduler.

It seems the issue occurs only with specific values in configuration, such as :

Pool pool = Pools.nonBlocking()
.url(url)
.user(username)
.password(password)
.healthCheck(DatabaseType.POSTGRES)
.maxPoolSize(40)
.maxIdleTime(2, TimeUnit.MINUTES)
.idleTimeBeforeHealthCheck(5, TimeUnit.SECONDS)
.connectionRetryInterval(10, TimeUnit.SECONDS)
.build()

With different values, the problem will not occur.

Context :

Database : PGv9.6
Driver-PG : v42.2.5
RxJava2-jdbc : v0.2.4

Thanks for your time

missing custom casting

idk if this can be classified as an issue but i have problem casting string to a custom class in automaping
i see no option in automap example to cast string field to customclass
example below
db.select(TEST.class)
.get()
.subscribe(
E -> System.out.println(E.getA())
).dispose();

deffinition of TEST interface :
@query("select '{"X":1, "Y":2}'::json "A";")

public interface TEST {

@Column("A")
    CUSTOMTYPE getA();

}
deffinition of CUSTOMTYPE interface :
public interface CUSTOMTYPE {

@Column("X")
int getX();

@Column("Y")
int getY();

}

springboot2 webflux(2.1.2)+rxjava2-jdbc nodata

@configuration
public class RxJava2JdbcConfig {
/**
* https://www.programcreek.com/java-api-examples/index.php?api=com.github.davidmoten.rx.jdbc.Database
* @return
* @throws Exception
*/
@bean
public Database db() throws Exception {
ConnectionProvider connection = ConnectionProvider.from("jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true", "root", "123456");
NonBlockingConnectionPool pool =
Pools.nonBlocking()
.maxPoolSize(Runtime.getRuntime().availableProcessors() * 5)
.connectionProvider(connection)
.build();
return Database.from(pool);
}
}

public Flux getAllAdverts3() {
String sql = "SELECT name,code FROM company";
return Flux.from(db.select(sql).autoMap(Company.class));
}

@query("select name, code from company")
public interface Company {
@column("name")
String name();

@Column("code") 
String code();

}

result :

[
{}
]
but mysql have data

mapping with select(Class<?>) in transaction fails

i admire your work so much, is just awesome, but
i discovered a bug :
Reproducing the problem :
Database.test() .select(Person.class) .transacted().valuesOnly() .map(Person::name)// or .map(m -> m.name()) .blockingForEach(v -> System.out.println(v));

throws an error : cannot convert java.lang.String to Person ....
even if IDE doesn't complain about anything ...
seems like in transaction, the library is maping the first column on the resultset only ...
i tested with first column of type integer .. and i get the error: cannot convert Integer to Person ....

fixing is simple :

replace the syntax
[rs -> Util.mapObject(rs, sb.cls, 1),]
with syntax
[Util.autoMap(sb.cls)]
in method
[private static Flowable<Tx> createFlowable(SelectAutomappedBuilder sb, Database db)]
from class [org.davidmoten.rx.jdbc.TransactedSelectAutomappedBuilder]

private static <T> Flowable<Tx<T>> createFlowable(SelectAutomappedBuilder<T> sb, Database db) { return Flowable.defer(() -> { AtomicReference<Connection> connection = new AtomicReference<Connection>(); return Select.create( sb.selectBuilder.connection.map(c -> Util.toTransactedConnection(connection, c)), // sb.selectBuilder.parameterGroupsToFlowable(), // sb.selectBuilder.sql, // sb.selectBuilder.fetchSize, // //rs -> Util.mapObject(rs, sb.cls, 1), //please comment this line to fix the bug Util.autoMap(sb.cls),//this is the replacing line for fixing the bug false ) // .materialize() // .flatMap(n -> Tx.toTx(n, connection.get(), db)) // .doOnNext(tx -> { if (tx.isComplete()) { ((TxImpl<T>) tx).connection().commit(); } }); }); }
also . as enhancement

would you considering adding this method as an option
in
class [org.davidmoten.rx.jdbc.TransactedSelectAutomappedBuilder]
inner class [TransactedSelectAutomappedBuilderValuesOnly]
public <R> Flowable<R> get(@Nonnull Function<? super T, ? extends R> function) { return createFlowable(b.selectBuilder, db).flatMap(Tx.flattenToValuesOnly()).map(function); }

this way ... i could do this :

Database.test() .select(Person.class) .transacted().valuesOnly() .get(Person::name) .blockingForEach(v -> System.out.println(v));

instead of :

Database.test() .select(Person.class) .transacted().valuesOnly() .get() .map(Person::name)// or .map(m -> m.name()) .blockingForEach(v -> System.out.println(v));

ArrayIndexOutOfBoundsException in a connection to Redis

Hi there,

We use Sprint WebFlux with Lettuce for Redis connections. We 've had an incident with a frozen Lettuce thread, because of
java.lang.ArrayIndexOutOfBoundsException at at org.davidmoten.rx.pool.MemberSingle.tryEmit(MemberSingle.java:276)

Please advise.

Here is the stacktrace:

stack_trace
java.lang.NullPointerException: subscribeActual failed
at io.reactivex.Single.subscribe(Single.java:3671)
at io.reactivex.internal.operators.single.SingleMap.subscribeActual(SingleMap.java:34)
at io.reactivex.Single.subscribe(Single.java:3666)
at io.reactivex.internal.operators.single.SingleToFlowable.subscribeActual(SingleToFlowable.java:37)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
at io.reactivex.Flowable.subscribe(Flowable.java:14935)
at io.reactivex.Flowable.subscribe(Flowable.java:14885)
at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:63)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.MonoMetrics$MetricsSubscriber.onNext(MonoMetrics.java:127)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:358)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at com.axway.salesforcePanelService.utils.MDCSubscriberWrapper.onNext(MDCUtils.kt:75)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:886)
at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:291)
at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:773)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:742)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:677)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:594)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at org.davidmoten.rx.pool.MemberSingle.tryEmit(MemberSingle.java:276)
at org.davidmoten.rx.pool.MemberSingle.drain(MemberSingle.java:190)
at org.davidmoten.rx.pool.MemberSingle.subscribeActual(MemberSingle.java:113)
at io.reactivex.Single.subscribe(Single.java:3666)
... 60 common frames omitted

thread_name
lettuce-nioEventLoop-7-1

==============================

This is part of build.gradle

plugins {
id("org.springframework.boot") version "2.4.7"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
id("com.gorylenko.gradle-git-properties") version "2.2.4"

kotlin("jvm") version "1.4.30"
kotlin("plugin.spring") version "1.4.30"

}

ext["snakeyaml.version"] = "1.28"
ext["netty.version"] = "4.1.63.Final"

allprojects {
repositories {
mavenCentral()
jcenter()
}

dependencyManagement {
    dependencies {
        dependency("commons-io:commons-io:2.7")
    }
}

}

dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("io.micrometer:micrometer-registry-jmx")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

implementation("org.apache.commons:commons-text:1.8")

implementation("org.apache.commons:commons-lang3")
implementation("commons-codec:commons-codec")
implementation("commons-cli:commons-cli:1.4")

implementation("net.logstash.logback:logstash-logback-encoder:5.0")

implementation("com.github.davidmoten:rxjava2-jdbc:0.2.7")
implementation("org.postgresql:postgresql:42.2.19")

testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
testImplementation("it.ozimov:embedded-redis:0.7.1")
testImplementation("org.assertj:assertj-core:3.18.1")

}

Select-statement with date

Hello!

I'm currently working with your framework and having some issues with getting values out of a mysql-db. I have a table with |id|date| in mysql, in selecting in mysql with the format yyyy-MM-dd HH:mm:ss everything works as expected. The id is a unique string and the date of type timestamp.

But when I'm doing a select with your framework I get nothing.

The code looks like this:

Flowable<Integer> update = this.database
                    .update("insert into steps(date) values(?)")
                    .parameterStream(replayDate).counts();
return this.database
                    .select("select id from steps where date = '?'")
                    .parameterStream(replayDate
                            .map(replayContent -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(replayContent))
                    ).dependsOn(update).getAs(String.class));

Also when I'm doing just a select I can't get an id for the date (and yeah there is a date in the db).
What I'm doing wrong? Already tried for hours, but won't get some data out of the db. Also with the format yyyy-MM-dd'T'HH:mm:ssZ nothing happens.

I also can provide the whole project if you need.

Thanks in advance!
Best regards

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.