Comments (11)
Tagging you since you're pretty objective @raniejade. What do you think?
from rxjavafx.
Hmmm, kinda looks like Observable#merge
to me. I do find it useful, but a bit redundant.
from rxjavafx.
Yes it is basically Observable#merge but it allows Observables to be added/removed at any time to a centralized instance. Additions and removals also affect all subscriptions. I'll create an example later showing why this is needed. Without it, you can get in a bit of trouble using Observable#merge with complex UI's.
from rxjavafx.
Follow up question, what happens when one of the source observable terminate with an error?
from rxjavafx.
Good question. I think the pooled Observable will continue on without it unless you use a retry. I'll need to do some tests today regarding error behavior, bit I think that may be on the developer to handle it.
from rxjavafx.
Alright discovered a few issues. The error communication is not working like I expect. It just stops pushing items. when an error occurs downstream. Subjects are something I am not an expert in, especially when it comes to error handling.
Here is my test app.
class MyApp : App() {
override val primaryView = TestView::class
}
object Decrementer {
val numbers = Observable.just(5.0,4.0,3.0,2.0,1.0,0.0,-1.0,-2.0, -3.0)
val decrementRequests = ObservableBus<ActionEvent>()
val decrements = decrementRequests.toObservable()
.zipWith(numbers) { ae,dbl -> dbl }
}
class TestView: View() {
override val root = VBox()
init {
with(root) {
label("start") {
Decrementer.decrements
.doOnNext { if (it == 0.0) throw Exception("Zero not allowed!") }
.retry()
.subscribe { text = it.toString() }
}
Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
}
}
}
class ObservableBus<T>(cacheCount: Int = 0) {
private val subject: SerializedSubject<Observable<T>, Observable<T>> = PublishSubject<Observable<T>>().toSerialized()
private val live: MutableSet<Observable<T>> = ConcurrentHashMap.newKeySet<Observable<T>>()
private val observable = subject.flatMap { obs: Observable<T> -> obs.takeWhile { live.contains(obs) } }
.observeOnFx().let { if (cacheCount > 0) it.cacheWithInitialCapacity(cacheCount) else it }
fun toObservable(): Observable<T> = observable
operator fun plusAssign(observable: Observable<T>) = add(observable)
operator fun minusAssign(observable: Observable<T>) = remove(observable)
fun add(observable: Observable<T>) {
live.add(observable)
subject.onNext(observable)
}
fun remove(observable: Observable<T>) {
live.remove(observable)
}
}
Another issue is the subscriptions seem to have to happen before the source Observables are added. Otherwise nothing is emitted.
This works fine:
label("start") {
Decrementer.decrements
.doOnNext { if (it == 0.0) throw Exception("Zero not allowed!") }
.retry()
.subscribe { text = it.toString() }
}
Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
This does not:
Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
label("start") {
Decrementer.decrements
.doOnNext { if (it == 0.0) throw Exception("Zero not allowed!") }
.retry()
.subscribe { text = it.toString() }
}
from rxjavafx.
Alright, I got a JavaFX-driven solution using an ObservableList
and a switchMap()
. This seems to work perfectly. Error handling is predictable too, and I completely avoided using Subjects.
class ObservableBus<T>(val cacheCount: Int = 0) {
private val sources = FXCollections.synchronizedObservableList(FXCollections.observableArrayList<Observable<T>>())
private val observable = sources.onChangedObservable()
.switchMap { Observable.from(it).flatMap { it } }
.observeOnFx().let { if (cacheCount > 0) it.cacheWithInitialCapacity(cacheCount) else it }
fun toObservable(): Observable<T> = observable
operator fun plusAssign(observable: Observable<T>) = add(observable)
operator fun minusAssign(observable: Observable<T>) = remove(observable)
fun add(observable: Observable<T>) {
sources.add(observable)
}
fun remove(observable: Observable<T>) {
sources.remove(observable)
}
}
from rxjavafx.
Here's a simpler example I was using to test retry()
against an error. The moment it hits 0
and throws the error, it resubscribes and starts all over like it should.
class MyApp : App() {
override val primaryView = TestView::class
}
object EventModel {
val decrementRequests = ObservableBus<ActionEvent>()
}
class TestView: View() {
override val root = VBox()
init {
with(root) {
label("start") {
EventModel.decrementRequests.toObservable()
.map { 1 }.scan(10) { x,y -> x - y }
.doOnNext { if (it == 0) throw Exception("Zero not allowed!") }
.retry(2)
.subscribe { text = it.toString() }
}
Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
}
}
}
Unless anybody questions this or thinks this is too niche, I'll probably implement this in RxJavaFX and RxKotlinFX. Is there a better name than ObservableBus
?
from rxjavafx.
@JakeWharton Has created a better name for this utility. We shall call it CompositeObservable
.
from rxjavafx.
I proposed this in RxJava just to see if it gets shot down or not. ReactiveX/RxJava#4235
from rxjavafx.
I went ahead and implemented this. If by any chance it goes into another library like RxJava or RxJava-Extras, I may deprecate later. For now I'd like it to be included.
from rxjavafx.
Related Issues (20)
- Ebook Learning RxJava with JavaFX not navigatable HOT 3
- Additional Transformer factories HOT 1
- Getting index in ListChange HOT 1
- Help with Deployment of 2.11 HOT 20
- Java 11 Automatic-Module-Name HOT 2
- JavaFxObservable.valuesOf(fxObservable, nullSentinel) does not emmit first nullSentinel HOT 1
- JavaFxObservable.valuesOf() and Bindings.bindBidirectional() strange behaviour. HOT 3
- JavaFxObservable.actionEventsOf().subscribeOn(Schedulers.newThread()) misses some emissions HOT 3
- 2.11.0-RC16 / error: cannot access JavaFxObserver HOT 4
- Mockito complaining with Java 11/Gradle 5 HOT 1
- No factories should subscribeOn() the Platform thread
- JavaFxObserver#onError: custom implementation HOT 1
- Need help with maintenance, call for collaborators HOT 9
- Support scene accelerators HOT 4
- Change object not emitted when observed property had changed to null
- A typo on Gitbook guide
- It looks like passing all the tests with openjfx 13.0.1 HOT 2
- Error - cannot access io.reactivex.Observable HOT 1
- Is there a plan to use RXJava 3 ? HOT 2
- Setting up a new build chain for stable/nightly releases.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rxjavafx.