jakewharton / retrofit2-reactor-adapter Goto Github PK
View Code? Open in Web Editor NEWA Project Reactor CallAdapter.Factory implementation for Retrofit 2.
License: Apache License 2.0
A Project Reactor CallAdapter.Factory implementation for Retrofit 2.
License: Apache License 2.0
hi
data class Person(var name: String,
var age: Int)
interface RouteService {
@get("/") fun findAll() : Flux
@get("/{name}") fun findByName(@path("name") name: String) : Mono
}
val retrofit = Retrofit.Builder().apply {
baseUrl("http://localhost:8888")
addConverterFactory(GsonConverterFactory.create())
addCallAdapterFactory(ReactorCallAdapterFactory.create())
}.build()
val restService = retrofit.create(RouteService::class.java)
restService.findAll().log().subscribe()
when i exexcute i see this exception :
Caused by: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was BEGIN_ARRAY at line 1 column 2 path $
at com.google.gson.stream.JsonReader.beginObject(JsonReader.java:385)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:213)
... 18 more
whats wrong here ???
I spent a week a year ago trying to receive a Flux from a simple Retrofit client app using this adapter and a Spring server whose controller emits Flux with variations of every possible reasonable value for "produces = [MediaType.*". I gave up after days of trying to get the client to be able to read streaming FooBars.
Yesterday I thought I'd give it another shot but this time with just Flux which, I realize, semantically, is the exact same thing in terms of your adapter and any Spring controller. Nevertheless, I still was unable to just receive a simple Flux stream.
Is there some other MediaType I need to set to get this to work out of the box? I'm a seriously good Googler and I've never come across a single post or StackOverflow thread where someone has had the same issue other than a GitHub issue you responded to back in the day about how to handle Flux<List> which I felt was not really related to my issue here since has default converter so I shouldn't need to write a custom one.
I know you're super busy at Google now, but once you accepted a pull request I made for a fix/enhancement to one of your repos and I lucked out then, so who knows, it was worth a shot!
Thanks if you have time to respond!
Below is the code:
Using MediaType.APPLICATION_JSON ("application/json") produces the error
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Expected an int but was BEGIN_ARRAY at line 1 column 2 path $
Caused by: java.lang.IllegalStateException: Expected an int but was BEGIN_ARRAY at line 1 column 2 path $
Using MediaType.APPLICATION_JSON_VALUE ("application/x-ndjson") produces the error
reactor.core.Exceptions$ErrorCallbackNotImplemented: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 2 column 2 path $
Caused by: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 2 column 2 path $
Here is my controller:
@RestController
class MyController {
@GetMapping("/ints", produces = [MediaType.APPLICATION_JSON_VALUE])
fun produceInts(): Flux<Int> {
return Flux.interval(Duration.ofSeconds(1))
.map {
Random.nextInt().also { println("Server: $it) }
}
}
}
And here is my client application:
private interface ApiService {
@GET("/ints")
fun getInts(): Flux<Int>
}
fun main() {
val retrofit = Retrofit.Builder()
.baseUrl("http://localhost:8080")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(ReactorCallAdapterFactory.create())
.build()
val api = retrofit.create(ApiService::class.java)
ints.subscribe { value ->
println("REACTOR CLIENT RECEIVED: $value")
}
while (true) {
val result = api.getInts()
result.subscribe { value ->
println("Client: $value")
}
Thread.sleep(Duration.ofSeconds(1).toMillis())
}
}
In Reactor 3.1 Cancellable interface was replaced by Disposable, and now retrofit2-reactor-adapter is crashing with an error:
java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: reactor/core/Cancellation
at com.jakewharton.retrofit2.adapter.reactor.CallSinkConsumer.accept(CallSinkConsumer.java:35) ~[retrofit2-reactor-adapter-1.0.0.jar:?]
at com.jakewharton.retrofit2.adapter.reactor.CallSinkConsumer.accept(CallSinkConsumer.java:24) ~[retrofit2-reactor-adapter-1.0.0.jar:?]
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:90) ~[reactor-core-3.1.0.M1.jar:3.1.0.M1]
https://github.com/reactor/reactor-core/releases/tag/v3.1.0.M1
Can you provide me with a few more details on how Flux works? In your tests you only get a single item from the server. Is it limited to that, or will it make more requests?
Hello,
This could be a bug or a question, I am not sure which.
When I get a 204 response from the http server, Mono is currently returning a null
instead of being empty
. As far as I understand Mono it is 0 or 1 element in it. But, I could be wrong.
To recreate, I added this method to your MonoTest
and it passes:
@Test public void bodySuccess204() {
server.enqueue(new MockResponse().setResponseCode(204));
RecordingSubscriber<String> subscriber = subscriberRule.create();
service.body().subscribe(subscriber);
subscriber.assertValue(null).assertComplete();
}
The question then is:
In the documentation of ReactorCallAdapterFactory it states that if you provide a scheduler it will modify the outcome of the request to be executed on a thread managed by the scheduler provided by using publishOn.
I would assume this functionality is added to provide a way for users to get off the netty response thread quickly after returning from webclient where you may potentially do blocking work on the netty response thread downstream as documented on https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-concurrency-model
However in the code it doesnt use publishOn it uses subscribeOn ReactoryCallAdapter which I believe does nothing except moves the subscription to the schedulers thread, than executes the request and afterwards leaves the response to be handled on the netty thread incorrectly.
I believe this should be using publishOn as documented to move the response handling to a thread controlled by the scheduler provided.
I originally reported on a dependant project: spring-projects-experimental/spring-cloud-square#27
Hi,
Something like this works:
@GET("getJsonArrayOfObjects")
Flux<List<MyJsonObject>> getJsonArrayOfObjects(@Query("name") String name);
while
@GET("getJsonArrayOfObjects")
Flux<MyJsonObject> getJsonArrayOfObjects(@Query("name") String name);
does not.
Btw I am using the JacksonConverterFactory
to deserialize the objects.
I am not sure whether this makes sense, but when I query the external sample rest api I'd rather get a Flux, which emits each and every item of the list rather than a Flux, which emits the whole list.
So I'd like to have the possibility to use Flux<MyJsonObject>.fromArray(MyJsonObject[])
Does this make sense? And in case it does. Can you explain to me which code needs to be adjusted to implement this feature, so that I can create a pull request for this feature?
Thanks in advance and I really appreciate your work.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.