Coder Social home page Coder Social logo

Comments (11)

thomasnield avatar thomasnield commented on May 14, 2024

Tagging you since you're pretty objective @raniejade. What do you think?

from rxjavafx.

raniejade avatar raniejade commented on May 14, 2024

Hmmm, kinda looks like Observable#merge to me. I do find it useful, but a bit redundant.

from rxjavafx.

thomasnield avatar thomasnield commented on May 14, 2024

Yes it is basically Observable#merge but it allows Observables to be added/removed at any time to a centralized instance. Additions and removals also affect all subscriptions. I'll create an example later showing why this is needed. Without it, you can get in a bit of trouble using Observable#merge with complex UI's.

from rxjavafx.

raniejade avatar raniejade commented on May 14, 2024

Follow up question, what happens when one of the source observable terminate with an error?

from rxjavafx.

thomasnield avatar thomasnield commented on May 14, 2024

Good question. I think the pooled Observable will continue on without it unless you use a retry. I'll need to do some tests today regarding error behavior, bit I think that may be on the developer to handle it.

from rxjavafx.

thomasnield avatar thomasnield commented on May 14, 2024

Alright discovered a few issues. The error communication is not working like I expect. It just stops pushing items. when an error occurs downstream. Subjects are something I am not an expert in, especially when it comes to error handling.

Here is my test app.

class MyApp : App() {
    override val primaryView = TestView::class
}


object Decrementer {

    val numbers = Observable.just(5.0,4.0,3.0,2.0,1.0,0.0,-1.0,-2.0, -3.0)
    val decrementRequests = ObservableBus<ActionEvent>()

    val decrements = decrementRequests.toObservable()
            .zipWith(numbers) { ae,dbl -> dbl }
}


class TestView: View() {
    override val root = VBox()


    init {
        with(root) {
            label("start") {
                Decrementer.decrements
                        .doOnNext { if (it == 0.0) throw Exception("Zero not allowed!") }
                        .retry()
                        .subscribe { text = it.toString() }
            }
            Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
            Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
        }
    }
}


class ObservableBus<T>(cacheCount: Int = 0) {

    private val subject: SerializedSubject<Observable<T>, Observable<T>> = PublishSubject<Observable<T>>().toSerialized()
    private val live: MutableSet<Observable<T>> = ConcurrentHashMap.newKeySet<Observable<T>>()

    private val observable = subject.flatMap { obs: Observable<T> -> obs.takeWhile { live.contains(obs) } }
           .observeOnFx().let { if (cacheCount > 0) it.cacheWithInitialCapacity(cacheCount) else it }

    fun toObservable(): Observable<T> = observable

    operator fun plusAssign(observable: Observable<T>) = add(observable)
    operator fun minusAssign(observable: Observable<T>) = remove(observable)

    fun add(observable: Observable<T>) {
        live.add(observable)
        subject.onNext(observable)
    }
    fun remove(observable: Observable<T>) {
        live.remove(observable)
    }
}

Another issue is the subscriptions seem to have to happen before the source Observables are added. Otherwise nothing is emitted.

This works fine:

label("start") {
    Decrementer.decrements
            .doOnNext { if (it == 0.0) throw Exception("Zero not allowed!") }
            .retry()
            .subscribe { text = it.toString() }
}
Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
Decrementer.decrementRequests += button("Decrementer 2").actionEvents()

This does not:

Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
label("start") {
    Decrementer.decrements
            .doOnNext { if (it == 0.0) throw Exception("Zero not allowed!") }
            .retry()
            .subscribe { text = it.toString() }
}

from rxjavafx.

thomasnield avatar thomasnield commented on May 14, 2024

Alright, I got a JavaFX-driven solution using an ObservableList and a switchMap(). This seems to work perfectly. Error handling is predictable too, and I completely avoided using Subjects.

class ObservableBus<T>(val cacheCount: Int = 0) {

    private val sources = FXCollections.synchronizedObservableList(FXCollections.observableArrayList<Observable<T>>())

    private val observable = sources.onChangedObservable()
            .switchMap { Observable.from(it).flatMap { it } }
            .observeOnFx().let { if (cacheCount > 0) it.cacheWithInitialCapacity(cacheCount) else it }

    fun toObservable(): Observable<T> = observable

    operator fun plusAssign(observable: Observable<T>) = add(observable)
    operator fun minusAssign(observable: Observable<T>) = remove(observable)

    fun add(observable: Observable<T>) {
        sources.add(observable)
    }
    fun remove(observable: Observable<T>) {
        sources.remove(observable)
    }
}

from rxjavafx.

thomasnield avatar thomasnield commented on May 14, 2024

Here's a simpler example I was using to test retry() against an error. The moment it hits 0 and throws the error, it resubscribes and starts all over like it should.

class MyApp : App() {
    override val primaryView = TestView::class
}

object EventModel {
    val decrementRequests = ObservableBus<ActionEvent>()
}

class TestView: View() {
        override val root = VBox()

        init {
            with(root) {
                label("start") {
                    EventModel.decrementRequests.toObservable()
                            .map { 1 }.scan(10) { x,y -> x - y }
                            .doOnNext { if (it == 0) throw Exception("Zero not allowed!") }
                            .retry(2)
                            .subscribe { text = it.toString() }
                }
                Decrementer.decrementRequests += button("Decrementer 1").actionEvents()
                Decrementer.decrementRequests += button("Decrementer 2").actionEvents()
            }
        }
}

Unless anybody questions this or thinks this is too niche, I'll probably implement this in RxJavaFX and RxKotlinFX. Is there a better name than ObservableBus?

from rxjavafx.

thomasnield avatar thomasnield commented on May 14, 2024

@JakeWharton Has created a better name for this utility. We shall call it CompositeObservable.

from rxjavafx.

thomasnield avatar thomasnield commented on May 14, 2024

I proposed this in RxJava just to see if it gets shot down or not. ReactiveX/RxJava#4235

from rxjavafx.

thomasnield avatar thomasnield commented on May 14, 2024

I went ahead and implemented this. If by any chance it goes into another library like RxJava or RxJava-Extras, I may deprecate later. For now I'd like it to be included.

from rxjavafx.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.