Coder Social home page Coder Social logo

rxswiftcommunity / rxswiftext Goto Github PK

View Code? Open in Web Editor NEW
1.3K 47.0 211.0 851 KB

A collection of Rx operators & tools not found in the core RxSwift distribution

License: MIT License

Ruby 0.42% Swift 98.91% Shell 0.67%
rxswift reactive-programming

rxswiftext's Introduction

CircleCI pod Carthage compatible

RxSwiftExt

If you're using RxSwift, you may have encountered situations where the built-in operators do not bring the exact functionality you want. The RxSwift core is being intentionally kept as compact as possible to avoid bloat. This repository's purpose is to provide additional convenience operators and Reactive Extensions.

Installation

This branch of RxSwiftExt targets Swift 5.x and RxSwift 5.0.0 or later.

  • If you're looking for the Swift 4 version of RxSwiftExt, please use version 3.4.0 of the framework.

CocoaPods

Add to your Podfile:

pod 'RxSwiftExt', '~> 5'

This will install both the RxSwift and RxCocoa extensions. If you're interested in only installing the RxSwift extensions, without the RxCocoa extensions, simply use:

pod 'RxSwiftExt/Core'

Using Swift 4:

pod 'RxSwiftExt', '~> 3'

Carthage

Add this to your Cartfile

github "RxSwiftCommunity/RxSwiftExt"

Operators

RxSwiftExt is all about adding operators and Reactive Extensions to RxSwift!

Operators

These operators are much like the RxSwift & RxCocoa core operators, but provide additional useful abilities to your Rx arsenal.

There are two more available operators for materialize()'d sequences:

Read below for details about each operator.

Reactive Extensions

RxSwift/RxCocoa Reactive Extensions are provided to enhance existing objects and classes from the Apple-ecosystem with Reactive abilities.


Operator details

unwrap

Unwrap optionals and filter out nil values.

  Observable.of(1,2,nil,Int?(4))
    .unwrap()
    .subscribe { print($0) }
next(1)
next(2)
next(4)

ignore

Ignore specific elements.

  Observable.from(["One","Two","Three"])
    .ignore("Two")
    .subscribe { print($0) }
next(One)
next(Three)
completed

ignoreWhen

Ignore elements according to closure.

  Observable<Int>
    .of(1,2,3,4,5,6)
    .ignoreWhen { $0 > 2 && $0 < 6 }
    .subscribe { print($0) }
next(1)
next(2)
next(6)
completed

once

Send a next element exactly once to the first subscriber that takes it. Further subscribers get an empty sequence.

  let obs = Observable.once("Hello world")
  print("First")
  obs.subscribe { print($0) }
  print("Second")
  obs.subscribe { print($0) }
First
next(Hello world)
completed
Second
completed

distinct

Pass elements through only if they were never seen before in the sequence.

Observable.of("a","b","a","c","b","a","d")
    .distinct()
    .subscribe { print($0) }
next(a)
next(b)
next(c)
next(d)
completed

mapTo

Replace every element with the provided value.

Observable.of(1,2,3)
    .mapTo("Nope.")
    .subscribe { print($0) }
next(Nope.)
next(Nope.)
next(Nope.)
completed

mapAt

Transform every element to the value at the provided key path.

struct Person {
    let name: String
}

Observable
    .of(
        Person(name: "Bart"),
        Person(name: "Lisa"),
        Person(name: "Maggie")
    )
    .mapAt(\.name)
    .subscribe { print($0) }
next(Bart)
next(Lisa)
next(Maggie)
completed

not

Negate booleans.

Observable.just(false)
    .not()
    .subscribe { print($0) }
next(true)
completed

and

Verifies that every value emitted is true

Observable.of(true, true)
	.and()
	.subscribe { print($0) }

Observable.of(true, false)
	.and()
	.subscribe { print($0) }

Observable<Bool>.empty()
	.and()
	.subscribe { print($0) }

Returns a Maybe<Bool>:

success(true)
success(false)
completed

cascade

Sequentially cascade through a list of observables, dropping previous subscriptions as soon as an observable further down the list starts emitting elements.

let a = PublishSubject<String>()
let b = PublishSubject<String>()
let c = PublishSubject<String>()
Observable.cascade([a,b,c])
    .subscribe { print($0) }
a.onNext("a:1")
a.onNext("a:2")
b.onNext("b:1")
a.onNext("a:3")
c.onNext("c:1")
a.onNext("a:4")
b.onNext("b:4")
c.onNext("c:2")
next(a:1)
next(a:2)
next(b:1)
next(c:1)
next(c:2)

pairwise

Groups elements emitted by an Observable into arrays, where each array consists of the last 2 consecutive items; similar to a sliding window.

Observable.from([1, 2, 3, 4, 5, 6])
    .pairwise()
    .subscribe { print($0) }
next((1, 2))
next((2, 3))
next((3, 4))
next((4, 5))
next((5, 6))
completed

nwise

Groups elements emitted by an Observable into arrays, where each array consists of the last N consecutive items; similar to a sliding window.

Observable.from([1, 2, 3, 4, 5, 6])
    .nwise(3)
    .subscribe { print($0) }
next([1, 2, 3])
next([2, 3, 4])
next([3, 4, 5])
next([4, 5, 6])
completed

retry

Repeats the source observable sequence using given behavior in case of an error or until it successfully terminated. There are four behaviors with various predicate and delay options: immediate, delayed, exponentialDelayed and customTimerDelayed.

// in case of an error initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = sampleObservable.retry(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.0), scheduler: delayScheduler)
    .subscribe(onNext: { event in
        print("Receive event: \(event)")
    }, onError: { error in
        print("Receive error: \(error)")
    })
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive error: fatalError

repeatWithBehavior

Repeats the source observable sequence using given behavior when it completes. This operator takes the same parameters as the retry operator. There are four behaviors with various predicate and delay options: immediate, delayed, exponentialDelayed and customTimerDelayed.

// when the sequence completes initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = completingObservable.repeatWithBehavior(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.2), scheduler: delayScheduler)
    .subscribe(onNext: { event in
        print("Receive event: \(event)")
})
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second

catchErrorJustComplete

Completes a sequence when an error occurs, dismissing the error condition

let _ = sampleObservable
    .do(onError: { print("Source observable emitted error \($0), ignoring it") })
    .catchErrorJustComplete()
    .subscribe {
        print ("\($0)")
}
next(First)
next(Second)
Source observable emitted error fatalError, ignoring it
completed

pausable

Pauses the elements of the source observable sequence unless the latest element from the second observable sequence is true.

let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

let trueAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in true }
let falseAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in false }
let pauser = Observable.of(trueAtThreeSeconds, falseAtFiveSeconds).merge()

let pausedObservable = observable.pausable(pauser)

let _ = pausedObservable
    .subscribe { print($0) }
next(2)
next(3)

More examples are available in the project's Playground.

pausableBuffered

Pauses the elements of the source observable sequence unless the latest element from the second observable sequence is true. Elements emitted by the source observable are buffered (with a configurable limit) and "flushed" (re-emitted) when the observable resumes.

Examples are available in the project's Playground.

apply

Apply provides a unified mechanism for applying transformations on Observable sequences, without having to extend ObservableType or repeating your transformations. For additional rationale for this see discussion on github

// An ordinary function that applies some operators to its argument, and returns the resulting Observable
func requestPolicy(_ request: Observable<Void>) -> Observable<Response> {
    return request.retry(maxAttempts)
        .do(onNext: sideEffect)
        .map { Response.success }
        .catchError { error in Observable.just(parseRequestError(error: error)) }

// We can apply the function in the apply operator, which preserves the chaining style of invoking Rx operators
let resilientRequest = request.apply(requestPolicy)

filterMap

A common pattern in Rx is to filter out some values, then map the remaining ones to something else. filterMap allows you to do this in one step:

// keep only even numbers and double them
Observable.of(1,2,3,4,5,6)
	.filterMap { number in
		(number % 2 == 0) ? .ignore : .map(number * 2)
	}

The sequence above keeps even numbers 2, 4, 6 and produces the sequence 4, 8, 12.

errors, elements

These operators only apply to observable sequences that have been materialized with the materialize() operator (from RxSwift core). errors returns a sequence of filtered error events, ommitting elements. elements returns a sequence of filtered element events, ommitting errors.

let imageResult = _chooseImageButtonPressed.asObservable()
    .flatMap { imageReceiver.image.materialize() }
    .share()

let image = imageResult
    .elements()
    .asDriver(onErrorDriveWith: .never())

let errorMessage = imageResult
    .errors()
    .map(mapErrorMessages)
    .unwrap()
    .asDriver(onErrorDriveWith: .never())

fromAsync

Turns simple asynchronous completion handlers into observable sequences. Suitable for use with existing asynchronous services which call a completion handler with only one parameter. Emits the result produced by the completion handler then completes.

func someAsynchronousService(arg1: String, arg2: Int, completionHandler:(String) -> Void) {
    // a service that asynchronously calls
	// the given completionHandler
}

let observableService = Observable
    .fromAsync(someAsynchronousService)

observableService("Foo", 0)
    .subscribe(onNext: { (result) in
        print(result)
    })
    .disposed(by: disposeBag)

zip(with:)

Convenience version of Observable.zip(_:). Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.

let first = Observable.from(numbers)
let second = Observable.from(strings)

first.zip(with: second) { i, s in
        s + String(i)
    }.subscribe(onNext: { (result) in
        print(result)
    })
next("a1")
next("b2")
next("c3")

merge(with:)

Convenience version of Observable.merge(_:). Merges elements from the observable sequence with those of a different observable sequences into a single observable sequence.

let oddStream = Observable.of(1, 3, 5)
let evenStream = Observable.of(2, 4, 6)
let otherStream = Observable.of(1, 5, 6)

oddStream.merge(with: evenStream, otherStream)
    .subscribe(onNext: { result in
        print(result)
    })
1 2 1 3 4 5 5 6 6

ofType

The ofType operator filters the elements of an observable sequence, if that is an instance of the supplied type.

Observable.of(NSNumber(value: 1),
                  NSDecimalNumber(string: "2"),
                  NSNumber(value: 3),
                  NSNumber(value: 4),
                  NSDecimalNumber(string: "5"),
                  NSNumber(value: 6))
        .ofType(NSDecimalNumber.self)
        .subscribe { print($0) }
next(2)
next(5)
completed

This example emits 2, 5 (NSDecimalNumber Type).

Emits the number of items emitted by an Observable once it terminates with no errors. If a predicate is given, only elements matching the predicate will be counted.

Observable.from([1, 2, 3, 4, 5, 6])
    .count { $0 % 2 == 0 }
    .subscribe()
next(3)
completed

partition

Partition a stream into two separate streams of elements that match, and don't match, the provided predicate.

let numbers = Observable
        .of(1, 2, 3, 4, 5, 6)

    let (evens, odds) = numbers.partition { $0 % 2 == 0 }

    _ = evens.debug("even").subscribe() // emits 2, 4, 6
    _ = odds.debug("odds").subscribe() // emits 1, 3, 5

bufferWithTrigger

Collects the elements of the source observable, and emits them as an array when the trigger emits.

let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let signalAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in () }
let signalAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in () }
let trigger = Observable.of(signalAtThreeSeconds, signalAtFiveSeconds).merge()
let buffered = observable.bufferWithTrigger(trigger)
buffered.subscribe { print($0) }
// prints next([0, 1, 2]) @ 3, next([3, 4]) @ 5

A live demonstration is available in the Playground.

Reactive Extensions details

UIViewPropertyAnimator.animate

The animate(afterDelay:) operator provides a Completable that triggers the animation upon subscription and completes when the animation ends.

button.rx.tap
    .flatMap {
        animator1.rx.animate()
            .andThen(animator2.rx.animate(afterDelay: 0.15))
            .andThen(animator3.rx.animate(afterDelay: 0.1))
    }

UIViewPropertyAnimator.fractionComplete

The fractionComplete binder provides a reactive way to bind to UIViewPropertyAnimator.fractionComplete.

slider.rx.value.map(CGFloat.init)
    .bind(to: animator.rx.fractionComplete)

UIScrollView.reachedBottom

reachedBottom provides a sequence that emits every time the UIScrollView is scrolled to the bottom, with an optional offset.

tableView.rx.reachedBottom(offset: 40)
            .subscribe { print("Reached bottom") }

License

This library belongs to RxSwift Community.

RxSwiftExt is available under the MIT license. See the LICENSE file for more info.

rxswiftext's People

Contributors

acchou avatar andreanmasiro avatar anton-plebanovich avatar bobgodwinx avatar bontojr avatar carlosypunto avatar cliss avatar emorydunn avatar fpillet avatar freak4pc avatar giginet avatar icanzilb avatar jdisho avatar jegnux avatar jeremiegirault avatar joihelgi avatar m0rtymerr avatar madsbogeskov avatar michaelavila avatar noppefoxwolf avatar patmalt avatar reloni avatar samhfrancis avatar solidcell avatar taejoongyoon avatar tcurdt avatar tgyhlsb avatar thanegill avatar vincent-pradeilles avatar vzsg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxswiftext's Issues

repeatEvery operator

repeatEvery

Repeats an observable sequence each period.

extension ObservableType {
    /**
     Returns an observable sequence that repeats at each period.

     - seealso: [timer operator on reactivex.io](http://reactivex.io/documentation/operators/timer.html)

     - parameter period: Period to produce subsequent values.
     - returns: An observable sequence that repeat at each period.
     */
    @warn_unused_result(message="http://git.io/rxs.uo")
    public func repeatEvery(period: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<E> {
            return Observable<Int>.timer(0, period: period, scheduler: scheduler)
                .flatMapLatest { _ in
                    return self
            }
    }
}

Motivation for inclusion

Cleaner more readable repetition/polling for an observable.

Example of use

Observable<NSDate>.just(timerEnd)
    .repeatEvery(1, scheduler: MainScheduler.instance)
    .map { $0.timeIntervalSinceNow }
    .map(remainingTime.stringFromTimeInterval)
    .bindTo(timerLabel.rx_text)

Warnings with Xcode 8.3

../Pods/RxSwiftExt/Source/cascade.swift:28:21: Static method should not be declared public because its generic requirement uses an internal type
../Pods/RxSwiftExt/Source/cascade.swift:14:12: Type declared here

Distinct operator

Name and description

distinct()

suppress duplicate items emitted by an Observable

Motivation for inclusion

We have some variants of distinctUntilChanged operator in the main repo, but do not have distinct, which is implemented for the most other languages which supports Rx.

Proposed implementation

extension Observable where Element: Hashable {

   func distinct() -> Observable<Element> {
      var set = Set<Element>()
      return flatMap { element -> Observable<Element> in
         objc_sync_enter(self); defer {objc_sync_exit(self)}
         if set.contains(element) {
            return Observable<Element>.empty()
         } else {
            set.insert(element)
            return Observable<Element>.just(element)
         }
      }
   }
}

concatElements

Name of the operator

concatElements operator ensures sequential delivery of all elements of each inner sequence

Motivation for inclusion

While the merge() operator merges the elements of all inner sequences, it doesn't guarantee order of delivery. This operator guarantees that each new inner sequence emitted by a sequence will be subscribed to immediately, but element it emits will be buffered until all previously emitted sequences have completed.

One example of use is when sending out multiple network requests for elements that should be ordered on output.

If any of the inner sequences errors out, concatElement immediately errors too. Otherwise it completes only when the source observable and all the inner sequences have completed.

Example of usage

[request1, request2].toObservable()
  .concatElements()
  .scan(0) { ($0+1, $1) }
  .subscribeNext { (position, element) in
    print("Received result \(position): \(element)")
  }

Add RxCocoa.framework to Linked Frameworks

Name and description

I'm having trouble building RxSwiftExt with Carthage. I have included RxSwiftExt within my project as a dependent project, but noticed that it does not link to RxCocoa, although there are import RxCocoa dependencies within the source code.

Motivation for inclusion

Explicitly link RxCocoa to allow for a build with Carthage

Please let me know if you would like a PR.

Thanks

Documentation via SourceKitten

So, I've taken a very quick stab at this, as I want to go out for drinks in 10 minutes. So here's the gist. The thing that powers Jazzy could be used to generate READMEs of the public header. Here's a quick POC example I threw together:

doc.rb

  require 'JSON'
  require 'pathname'

  puts "### RxSwift Extensions\n"

  cache_file = "rxswiftext.json"
  unless File.exists? cache_file
    raise "Needs SourceKitten installed" unless `which sourcekitten`.length
    `sourcekitten doc -- -workspace Demo/RxSwiftExtDemo.xcworkspace -scheme RxSwiftExt > #{cache_file}`
  end

  public_data = JSON.parse File.read(cache_file)
  public_data.each do |files|
    files.each do |path, data|
      path = Pathname.new path

      structures = data["key.substructure"].each do |roots|
        roots["key.substructure"].each do |data|
          puts data["key.parsed_declaration"]
          puts data["key.doc.comment"]
          puts ""
        end  
      end
    end
  end

What this does is create something that looks like this:

$ rerun ruby doc.rb

### RxSwift Extensions

public static func cascade<S : SequenceType where S.Generator.Element == Element, S.Generator.Element.E == T>(observables : S) -> Observable<T>
Cascade through a sequence of observables: every observable that sends a `next` value becomes the "current"
observable (like in `switchLatest`), and the subscription to all previous observables in the sequence is disposed.

This allows subscribing to multiple observable sequences while irrevocably switching to the next when it starts emitting. If any of the
currently subscribed-to sequences errors, the error is propagated to the observer and the sequence terminates.

- parameter observables: a sequence of observables which will all be immediately subscribed to
- returns: An observable sequence that contains elements from the latest observable sequence that emitted elements

public func cascade<S : SequenceType where S.Generator.Element == Self>(next : S) -> Observable<E>
Cascade through a sequence of observables: every observable that sends a `next` value becomes the "current"
observable (like in `switchLatest`), and the subscription to all previous observables in the sequence is disposed.

It uses SourceKitten to doc the module, then currently just outputs the data into a cached JSON file, which looks like this:

screen shot 2016-05-20 at 5 59 46 pm

Cached because running source kitten takes ~30s.

Conceptually, you can use this to do 2 things,

  1. Write great READMEs automatically, thereby keeping documentation with the code.
  2. Force people to write docs. In Danger we enforce that we document all public APIs.

It wouldn't be hard to go from this POC to something real, t's doable in any language that does JSON. Maybe this is something someone wanting to learn a bit more about scripting languages can pick up? I'm happy to jam on it too with someone, could turn it into a gem and make it available for any Xcodeproj.

Count operator

Name and description

Count operator

Count the number of items emitted by the source Observable and emit only this value

Motivation for inclusion

RxSwift doesn't have count operator now.
If you need the number of Observable, you can write the code as follows.

[1, 2, 3, 4].toObservable()
    .reduce(0) { a, _ in
        return a + 1
    }

In addition, filtering is as below.

[1, 2, 3, 4].toObservable()
    .filter { $0 <= 2 }
    .reduce(0) { a, _ in
        return a + 1
    }

I have difficulty in understanding what the code is about at a glance.
I think 'count' operator can solve this problem.

[1, 2, 3, 4].toObservable()
    .count { $0 <= 2 }

Example of use

// Count.swift
import Foundation

class CountSink<SourceType, O: ObserverType where O.E == Int>: Sink<O>, ObserverType {
    typealias Predicate = (SourceType) throws -> Bool

    private let _predicate: Predicate?
    private var _count: Int = 0

    init(predicate: Predicate?, observer: O) {
        _predicate = predicate
        super.init(observer: observer)
    }

    func on(event: Event<SourceType>) {
        switch event {
        case .Next(let value):
            do {
                let satisfies = try _predicate?(value)
                if satisfies != false {
                    self._count += 1
                }
            }
            catch let e {
                forwardOn(.Error(e))
                dispose()
            }
        case .Error(let error):
            forwardOn(.Error(error))
        case .Completed:
            forwardOn(.Next(_count))
            forwardOn(.Completed)
            dispose()
        }
    }
}

class Count<SourceType>: Producer<Int> {
    typealias Predicate = (SourceType) throws -> Bool

    private let _source: Observable<SourceType>
    private let _predicate: Predicate?

    init(source: Observable<SourceType>, predicate: Predicate?) {
        _source = source
        _predicate = predicate
    }

    override func run<O: ObserverType where O.E == Int>(observer: O) -> Disposable {
        let sink = CountSink(predicate: _predicate, observer: observer)
        sink.disposable = _source.subscribe(sink)
        return sink
    }
}
// Observable+Aggregate.swift
extension ObservableType {
    public func count(predicate: ((E) throws -> Bool)? = nil)
        -> Observable<Int> {
            return Count(source: self.asObservable(), predicate: predicate)
    }
}
// Example.swift
[1, 2, 3, 4].toObservable()
    .count { $0 <= 2 }
    .subscribeNext { count in
        print(count) // output: 2
    }

Failed to build with Carthage

Carthage 0.17.2

carthage build --platform iOS RxSwiftExt
*** xcodebuild output can be found in /var/folders/_n/p_0zqmfx2qd5vz2pq7n82gfc0000gn/T/carthage-xcodebuild.cfKfVr.log
*** Building scheme "RxBlocking-iOS" in Rx.xcworkspace
*** Building scheme "RxTests-iOS" in Rx.xcworkspace
*** Building scheme "RxCocoa-iOS" in Rx.xcworkspace
*** Building scheme "RxSwift-iOS" in Rx.xcworkspace
*** Building scheme "RxSwiftExtDemo" in RxSwiftExtDemo.xcworkspace
** BUILD FAILED **


The following build commands failed:
    PhaseScriptExecution [CP]\ Check\ Pods\ Manifest.lock /Users/lex/Library/Developer/Xcode/DerivedData/RxSwiftExtDemo-dyyohpxdxqzeeadvjfhygzlesmyx/Build/Intermediates/RxSwiftExtDemo.build/Release-iphoneos/RxSwiftExtDemo.build/Script-6FAF8FD8FC061620E64F107A.sh
(1 failure)
error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.
A shell task (/usr/bin/xcrun xcodebuild -workspace /Users/lex/git/ZUS/Carthage/Checkouts/RxSwiftExt/Demo/RxSwiftExtDemo.xcworkspace -scheme RxSwiftExtDemo -configuration Release -sdk iphoneos ONLY_ACTIVE_ARCH=NO BITCODE_GENERATION_MODE=bitcode CODE_SIGNING_REQUIRED=NO CODE_SIGN_IDENTITY= CARTHAGE=YES clean build) failed with exit code 65:
** BUILD FAILED **


The following build commands failed:
    PhaseScriptExecution [CP]\ Check\ Pods\ Manifest.lock /Users/lex/Library/Developer/Xcode/DerivedData/RxSwiftExtDemo-dyyohpxdxqzeeadvjfhygzlesmyx/Build/Intermediates/RxSwiftExtDemo.build/Release-iphoneos/RxSwiftExtDemo.build/Script-6FAF8FD8FC061620E64F107A.sh
(1 failure)

elements() doesn't resolve from the framework, only source

Thanks for these extensions. I'm using them to work with materialized events for calculations that can fail, but shouldn't cancel the sequence.

Xcode 8.3.2
RxSwiftExt 2.4.1

Using

carthage update --platform iOS --no-use-binaries --cache-builds

And then I insert the framework according to instructions.

I'm getting the following error:

/Users/kevin/src/.../ViewModels/something.swift:66:14: error: value of type 'Observable<Event<Double>>' has no member 'elements'
            .map { $0.tuplename1 }
~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~

(the next line is .elements()) (file and identifier names have been changed for nondisclosure)

Obviously, Observable<Event<Double>> should be the correct type for extension with .elements(), even according to the error message.

So far I've tried cleaning the project, rebuilding all Carthage frameworks, and removing the DerivedData directory.

I know the methods are there:

nm Carthage/Build/iOS/RxSwiftExt.framework/RxSwiftExt | grep elements
000000000000c130 t __TFFe10RxSwiftExtRx7RxSwift14ObservableTypewx1ES0_16EventConvertiblerS1_8elementsFT_GCS0_10ObservableWxS2_11ElementType__U0_FwxS2_WxS2_S5__
000000000000bf30 t __TFFe10RxSwiftExtRx7RxSwift14ObservableTypewx1ES0_16EventConvertiblerS1_8elementsFT_GCS0_10ObservableWxS2_11ElementType__U_FwxS2_Sb
000000000000be40 T __TFe10RxSwiftExtRx7RxSwift14ObservableTypewx1ES0_16EventConvertiblerS1_8elementsfT_GCS0_10ObservableWxS2_11ElementType__
000000000000c2c0 t __TPA__TFFe10RxSwiftExtRx7RxSwift14ObservableTypewx1ES0_16EventConvertiblerS1_8elementsFT_GCS0_10ObservableWxS2_11ElementType__U0_FwxS2_WxS2_S5__
000000000000c0f0 t __TPA__TFFe10RxSwiftExtRx7RxSwift14ObservableTypewx1ES0_16EventConvertiblerS1_8elementsFT_GCS0_10ObservableWxS2_11ElementType__U_FwxS2_Sb

But what's really strange is that if I remove the RxSwiftExt framework from my project, and put in the RxSwiftExt sources from Carthage/Checkouts, it works.

Any ideas?

Distinct Requires Parameter

The Distinct operator requires a parameter of type NSPredicate. Can't use distinct just to check if string fails w match any of the previous events. This is different from the distinct operator in other libraries like RxJava

paginate operator

Name and description

Paginate operator

  • provides pagination behavior

Motivation for inclusion

It was highly inspired by this tricky behavior in the RxExample ๐Ÿ‘

The use case is pretty common (from my perspective), so I want to introduce operator which will provide this behavior

Proposed implementation

extension ObservableType {
   func paginate<O: ObservableType>(nextPageTrigger: O,
                 hasNextPage: E -> Bool,
                 nextPageFactory: E -> Observable<E>) -> Observable<E> {
      return flatMap { page -> Observable<E> in
         if !hasNextPage(page) {
            return Observable.just(page)
         }
         return [
            Observable.just(page),
            Observable.never().takeUntil(nextPageTrigger),
            nextPageFactory(page)
         ].concat()
      }
   }
}

Example of use

   func paginateItems(batch: Batch = Batch.initial,
                      endPoint: EndPoint,
                      nextBatchTrigger: Observable<Void>) -> Observable<Page<[T]>> {

      let params = paramsProvider.pagingListParamsForBatch(batch)
      return httpClient
         .request(.GET, endPoint,
            parameters: paramsProvider.defaultParams + params,
            encoding: .URL,
            headers: nil)
         .map(PagingParser<T>.parse)
         .paginate(nextBatchTrigger,
                   hasNextPage: { (page) -> Bool in
                     return page.batch.next().hasNextPage
         }) { [weak self] (page) -> Observable<Page<[T]>> in
            return self?.paginateItems(page.batch.next(),
                                       endPoint: endPoint,
                                       nextBatchTrigger: nextBatchTrigger) ?? Observable.empty()
      }
   }
P.S.

Still not sure about naming and whether it's could be applied for all scenarios

Wondering what you guys think about its inclusion?

ignoreWhen (was: reject)

reject

reject operator as an opposite of filter.

Motivation for inclusion

It's trivial operator but I find it more verbose than i.e. .filter { $0 != foo }.

Example of usage

Observable.just("Act 1 Scene 5: The Great Hall in Capulet's mansion")
  .reject { $0.hasPrefix("Act 1") }

Instead of

Observable.just("Act 1 Scene 5: The Great Hall in Capulet's mansion")
  .filter { !$0.hasPrefix("Act 1") }

Subscription overloads to allow weak referencing of targets

Subscription overloads to allow weak referencing of targets

Currently when you subscribe to an observable the pattern is something like:

let obs: Observable<String>!
obs.subscribeNext { [weak self] value in
    self.handleString(value)
}

This allows us to avoid retain cycles on self but is cumbersome, it is tempting to write the following:

let obs: Observable<String>!
obs.subscribeNext(self.handleString)

however we have now introduced a retain cycle on `self

Motivation for inclusion

The second format is much more desirable but is obviously flawed - I'd like to introduce some overloads for subscription that get close to this format without the cycle...

Example of use

Here is the full code for the extensions:

extension ObservableType {
    /**
     Leverages instance method currying to provide a weak wrapper around an instance function

     - parameter obj:    The object that owns the function
     - parameter method: The instance function represented as `InstanceType.instanceFunc`
     */
    private func weakify<A: AnyObject, B>(obj: A, method: ((A) -> (B) -> Void)?) -> ((B) -> Void) {
        return { [weak obj] value in
            guard let obj = obj else { return }
            method?(obj)(value)
        }
    }

    /**
     Subscribes an event handler to an observable sequence.

     - parameter weak: Weakly referenced object containing the target function.
     - parameter on: Function to invoke on `weak` for each event in the observable sequence.
     - returns: Subscription object used to unsubscribe from the observable sequence.
     */
    @warn_unused_result(message="http://git.io/rxs.ud")
    public func subscribe<A: AnyObject>(weak obj: A, _ on: (A) -> (RxSwift.Event<Self.E>) -> Void) -> Disposable {
        return self.subscribe(weakify(obj, method: on))
    }

    /**
     Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence.

     - parameter weak: Weakly referenced object containing the target function.
     - parameter onNext: Function to invoke on `weak` for each element in the observable sequence.
     - parameter onError: Function to invoke on `weak` upon errored termination of the observable sequence.
     - parameter onCompleted: Function to invoke on `weak` upon graceful termination of the observable sequence.
     - parameter onDisposed: Function to invoke on `weak` upon any type of termination of sequence (if the sequence has
     gracefully completed, errored, or if the generation is cancelled by disposing subscription)
     - returns: Subscription object used to unsubscribe from the observable sequence.
     */
    @warn_unused_result(message="http://git.io/rxs.ud")
    public func subscribe<A: AnyObject>(
        weak obj: A,
        onNext: ((A) -> (Self.E) -> Void)? = nil,
        onError: ((A) -> (ErrorType) -> Void)? = nil,
        onCompleted: ((A) -> () -> Void)? = nil,
        onDisposed: ((A) -> () -> Void)? = nil) -> Disposable {

            let disposable: Disposable

            if let disposed = onDisposed {
                disposable = AnonymousDisposable(weakify(obj, method: disposed))
            }
            else {
                disposable = NopDisposable.instance
            }


            let observer = AnyObserver { (e: RxSwift.Event<Self.E>) in
                switch e {
                case .Next(let value):
                    self.weakify(obj, method: onNext)(value)
                case .Error(let e):
                    self.weakify(obj, method: onError)(e)
                    disposable.dispose()
                case .Completed:
                    self.weakify(obj, method: onCompleted)()
                    disposable.dispose()
                }
            }

            return StableCompositeDisposable.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }

    /**
     Subscribes an element handler to an observable sequence.

     - parameter weak: Weakly referenced object containing the target function.
     - parameter onNext: Function to invoke on `weak` for each element in the observable sequence.
     - returns: Subscription object used to unsubscribe from the observable sequence.
     */
    @warn_unused_result(message="http://git.io/rxs.ud")
    public func subscribeNext<A: AnyObject>(weak obj: A, _ onNext: (A) -> (Self.E) -> Void) -> Disposable {
        return self.subscribeNext(weakify(obj, method: onNext))
    }

    /**
     Subscribes an error handler to an observable sequence.

     - parameter weak: Weakly referenced object containing the target function.
     - parameter onError: Function to invoke on `weak` upon errored termination of the observable sequence.
     - returns: Subscription object used to unsubscribe from the observable sequence.
     */
    @warn_unused_result(message="http://git.io/rxs.ud")
    public func subscribeError<A: AnyObject>(weak obj: A, _ onError: (A) -> (ErrorType) -> Void) -> Disposable {
        return self.subscribeError(weakify(obj, method: onError))
    }

    /**
     Subscribes a completion handler to an observable sequence.

     - parameter weak: Weakly referenced object containing the target function.
     - parameter onCompleted: Function to invoke on `weak` graceful termination of the observable sequence.
     - returns: Subscription object used to unsubscribe from the observable sequence.
     */
    @warn_unused_result(message="http://git.io/rxs.ud")
    public func subscribeCompleted<A: AnyObject>(weak obj: A, _ onCompleted: (A) -> () -> Void) -> Disposable {
        return self.subscribeCompleted(weakify(obj, method: onCompleted))
    }
}

And the sample usage:

class FooViewController: UIViewController {
    var obs: Observable<String>!

    override func viewDidLoad() {
        super.viewDidLoad()
        obs.subscribeNext(weak: self, FooViewController.handleString)
    }

    private func handleString(string: String) {
        //..
    }
}

Add Mac Framework

Name and description

I'm working on a Mac application and would love to be able to use these extensions. RxSwiftExt doesn't use any iOS specific APIs and the all the tests pass on a macOS build.

Motivation for inclusion

Having a macOS build of the framework would help potentially dozens of Mac developers with handy Rx extensions. The bonus version of this would be to add a target for all of the platforms RxSwift has (iOS, macOS, watchOS, and tvOS).

pausableBuffered

Name and description

pausableBuffered โ€“ pauses emission of values and emits the latest value on resume

Motivation for inclusion

There already exists a pausable operator, but it doesn't store the latest value. This operator already exists in RxJS and simply for the sake of conformity it would be nice to have it here as well.

Sometimes the need is not to silence an observable (what pausable) but resume listening when ready and at that point it really makes sense to get the latest value.

Example of use

An example from my personal need, where pausable didn't cut was displaying search results but only after they were ready to be presented (as this had side effects in the UI). Basically I wanted to perform the search proactively, but have the results only when a modal sheet was closed. Might not be the best example though, but I think this really has its place.

let isOverlayVisible = Variable(true)
let searchResults = searchInput
    .debounce(0.3, scheduler: MainScheduler.instance)
    .distinctUntilChanged()
    .flatMapLatest { str in searchService.search(str) }

let visibleSearchResults = searchResults.pausableBuffered(pauser: self.isOverlayVisible.asObservable())

isOverlayVisible.value = false

retry / repeat (was: retryWithDelay)

Name and description

retryWithDelay(numberOfAttempts: Int, delayIncrease: Double)

Motivation for inclusion

Allows an operation to wait some time before trying again in case of error. Also allow for increment the waiting after each failed attempt

Example of use

We could use this operator like a "Retry to connect in" that we seen in many communicators app.
Sometimes it may be better to wait some time before trying an operation again.

Suggested implementation

This was a suggested implementation made by @mjohnson (on rxSwift slack). I plan to improve in this solution allowing for better parametrisation

.retryWhen { (attempts) in
        return Observable.zip(attempts, Observable.range(start: 1, count: 4)) { n, i in
            return (n, i)
            }.flatMap { (n, i) in

                return i >= 4 ? Observable.error(n) : Observable<Int>.timer(RxTimeInterval(i), scheduler: MainScheduler.instance)
        }

Observable.serialize

Name of the operator

Observable.serialize()

Motivation for inclusion

I'm new to Rx so I'm not really sure if this is a appropriate usage of ReactiveX's serialize operator or not. In my case I think it's useful because I need to create a bunch of HTTP requests in background then in response handler of each request I call observer.on(.Next(element)). In this case I want to ensure onNext will be delivered serially to subscriber.

Example of usage

func download(urls: [String]) -> Observable<NSURL> {
    return Observable<NSURL>.create { observer in
      urls.forEach { url in
        Alamofire.download(.GET, url, destination: destination)
          .response(queue: self.concurrentQueue, completionHandler: { request, response, _, error in
            observer.on(.Next(destinationUrl))
          }
      }
}

download(["url1", "url2"])
    .serialize()
    .subscribeNext { destinationUrl in
        // ...
    }

pairwise, nwise operators

parwise(), nwise(n:)

Transforms a sequence to return every consecutive pair of emissions.

Motivation for inclusion

One often need to compare a previous value for changes, this would make that simpler than the rather convoluted combination of .scan, .filter, .map needed to achieve this currently.

Example of use

-----0-----1-----2-----3-----4----->
.parwise()
     v     v     v     v     v
---------(0,1)-(1,2)-(2,3)-(3,4)--->

Support for RxSwift 3.1.0

Name and description

RxSwift has released 3.1.0, but the podspec for RxSwiftExt specifies ~> 3.0.0.

Motivation for inclusion

I'd like to use some of the functionality of RxSwift 3.1.0, specifically the improved UIBindingObserver behavior.

Example of use

Hopefully this should be an easy fix. I'd be happy to add a PR to update the podspec, but I'm less sure what testing is needed before making this change. Happy to do it if given clear instructions...

Materialize/Dematerialize operators

Name and description

The materialize/dematerialize operators are described on the ReactiveX site here: http://reactivex.io/documentation/operators/materialize-dematerialize.html

I propose the following implementation, which uses Event as the type of the materialized stream:

extension ObservableType {
    public func materialize() -> Observable<Event<E>> {
        return Observable.create { observer in
            return self.subscribe { event in
                observer.onNext(event)
                if event.isStopEvent {
                    observer.onCompleted()
                }
            }
        }
    }
}

protocol EventConvertibleType {
    associatedtype E
    func asEvent() -> Event<E>
}

extension Event : EventConvertibleType {
    typealias E = Element
    func asEvent() -> Event<E> { return self }
}

extension ObservableType where E: EventConvertibleType {
    func dematerialize() -> Observable<E.E> {
        return Observable.create { observer in
            return self.subscribe(onNext: { event in
                switch event.asEvent() {
                case .next(let element): observer.onNext(element)
                case .error(let error): observer.onError(error)
                case .completed: observer.onCompleted()
                }
            })
        }
    }
    
    /// Return only the error events of an Observable<Event<E>>
    func errors() -> Observable<Error> {
        return self.filter { $0.asEvent().error != nil }
            .map { $0.asEvent().error! }
    }
    
    /// Return only the onNext element events of an Observable<Event<E>>
    func elements() -> Observable<E.E> {
        return self.filter { $0.asEvent().element != nil }
            .map { $0.asEvent().element! }
    }
}

Motivation for inclusion

When reviewing the Kickstarter ios code base which was recently open sourced, I noticed this pattern for dealing with errors:

(from LoginViewModels.swift:112:

    let loginEvent = emailAndPassword
      .takeWhen(tryLogin)
      .switchMap { email, password in
        AppEnvironment.current.apiService.login(email: email, password: password, code: nil)
          .materialize()
    }

    self.logIntoEnvironment = loginEvent.values()

    let tfaError = loginEvent.errors()
      .filter { $0.ksrCode == .TfaRequired }
      .ignoreValues()

Note that Kickstarter uses ReactiveCocoa, not RxSwift, so the operators are a bit different, but the core idea is the same. Using materialize is a convenient way to turn any Observable into a new observable where errors are just another kind of event.

By itself, this is not terribly exciting because plain Observables can return .next or .error as well. But combined with flatMap this can be very convenient because errors generated by the flatMap closure will not terminate the flatMap with an error.

The same thing can be accomplished by using an enum like Result or a custom enum. But materialize/dematerialize is standardized by ReactiveX, and it does not require the user to create new custom data types.

Example of use

Consider this example from the RxSwift documentation for Units:

let results = query.rx.text
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
        fetchAutoCompleteItems(query)
            .observeOn(MainScheduler.instance)  // results are returned on MainScheduler
            .catchErrorJustReturn([])           // in the worst case, errors are handled
    }
    .shareReplay(1)                             // HTTP requests are shared and results replayed
                                                // to all UI elements

Because fetchAutoCompleteItems(query) might fail, the use of catchErrorJustReturn([]) is used to return an empty result. But in general failure might need to be handled in a different way, for example by popping up an alert, or doing some other action. While this could be done with a do operator, in general the actions could be complex and performing them here would complicate code in the common case and obscure the core logic.

Instead, we could materialize the results (this is kind of artificial so it is not necessarily recommended in this particular case, but the idea should stand):

let resultEvents = query.rx.text
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
        fetchAutoCompleteItems(query)
            .observeOn(MainScheduler.instance)
            .materialize()          // now the results have type Event<[String]>
    }
    .shareReplay(1)

let results = resultEvents.elements()
let errors = resultEvents.errors()

// To get the results with empty results for errors, the same as the original code
let resultsWithEmptyErrors = Observable.of(results, errors.map { [] }).merge()

// Now we can process error events and show alerts, or perform other processing
errors.subscribe(onNext: { 
    log.error($0.localizedDescription)
    showAlert($0)
})
.addDisposableTo(disposeBag)

The dematerialize operator is here for completeness, though I don't have a compelling use case for it at the moment.

Bring back errors() & elements() for materialized Observable

Name and description

Bring back errors() & elements() for materialized Observable

Motivation for inclusion

materialize() was removed in latest (2.3.0) release of RxSwiftExt, as it's became a part of the core. However, RxSwiftExt used to offer 2 additional operators for materialized Observable which were very handy to use. The usage of them were to filter elements from errors.

I think it is a good idea to bring them back for 2 reasons:

  1. it's very handy to have them
  2. Stability of projects which use RxSwiftExt. Right now each project which updates RxSwiftExt to the newest version and had used elements()/errors() will stop to compile because of lack of these operators.

Example of use

The usage is unchanged as it was in pre 2.3.0 releases:

        let imageResult = _chooseImageButtonPressed.asObservable()
            .flatMap { imageReceiver.image.materialize() }
            .share()
        
        image = imageResult
            .elements()
            .asDriver(onErrorDriveWith: .never())
        
        errorMessage = imageResult
            .errors()
            .map(mapErrorMessages)
            .unwrap()
            .asDriver(onErrorDriveWith: .never())

Pull request

As soon as the issue will be accepted I will create a pull request with the changes ;)

repeatWhen

repeatWhen

Repeats an observable when another observable emits.

extension ObservableType {
    @warn_unused_result(message="http://git.io/rxs.uo")
    public func repeatWhen<O: ObservableConvertibleType>(@autoclosure notificationHandler: (() -> O)) -> Observable<E> {
        return notificationHandler()
            .asObservable()
            .flatMapLatest { _ in
                return self
            }
    }
}

Motivation for inclusion

ReactiveX/RxSwift#396

Example of use

Polling where the repetition intervals are not equal.

Could be one of the underlying operators for #15.

ignoreErrors

ignoreErrors

Ignore errors produced by an observable. Two forms are proposed for this operator:

extension ObservableType {
    /**
     Unconditionally ignore all errors produced by the source observable

    - returns: An observable sequence that never fails
    */
    @warn_unused_result(message="http://git.io/rxs.uo")
    public func ignoreErrors() -> Observable<E>

    /**
     Conditionally ignore errors produced by the source observable

     - parameters:
     - returns: An observable sequence that never fails
     */
    public func ignoreErrors(predicate : (ErrorType) -> Bool) -> Observable<E>
}

Motivation for inclusion

There was a discussion on the RxSwift slack channel about the need to sometime ignore errors (or some errors) from network requests. Hence the proposal for two variants of the operator, one that unconditionally ignores all errors and another that selectively ignores errors for which the predicate returns true

negate(Bool) -> Bool

negate

negate() negates a Bool value

Not sure if that's the best name for the op?

Motivation for inclusion

Especially when working with UI you need to show/hide controls - the problem is that the same observable (let's say shouldShowSearchBar) will drive both the visibility of a search and the fact that some buttons will not be hidden - I get to use that often and I think having a map {!$0} for each case gets very verbose.

Example of usage

shouldShowSearchBar.bindTo(searchBar.rx_firstResponder) //true will make the bar first responder
shouldShowSearchBar.negate().bindTo(searchResults.rx_hidden) //true -> false will **unhide** the view

cascade (was: swichTo)

Name and description

switchTo(other : Observable<E>)

this operator will send all elements emitted by the source observable until the other observable starts emitting elements, at which point only elements emitted by other will be considered.

may need an additional optional flag to control whether the resulting observable completes when any of self and / or other completes

Motivation for inclusion

Allows subscribing to multiple sequences but irrevocably switching to the next when it starts emitting. This use case doesn't seem to be covered by any existing Rx operator.

Suggested implementation

extension ObservableType {
    func switchTo(other: Observable<E>) -> Observable<E> {
        return Observable.create { observer in
            let a = self.asObservable().subscribe(observer)
            let b = other.subscribe { (event) in
                a.dispose()
                observer.on(event)
            }
            return AnonymousDisposable {
                a.dispose()
                b.dispose()
            }
        }
    }
}

Retry predicate is inverted when we pass in the error parameter

Retry predicate is inverted

In retryWithBehavior.swift, I see this in the retry function:

if let shouldRetry = shouldRetry, !shouldRetry(error) {
					// also return error if predicate says so
					return Observable.error(error)
				}

Why are we using !shouldRetry(error) instead of shouldRetry(error)?
It tends to be non-intuitive to return false for the error conditions I want to retry. Is this intentional or a bug?

CI integration setup

Hi, guys

Since we have some working operators already.
What do you think about CI ?

not() operator accessible by Driver instances

Name and description

Enable use of not() operator by Driver instances.

Motivation for inclusion

It will be more simply invert a Driver value:
Instead of:

viewModel.isShowResult.asObservable().not().asDriver(onErrorJustReturn: false).drive(tipButton.rx.isHighlighted)
            .addDisposableTo(disposeBag)

Just:
viewModel.isShowResult.not().drive(tipButton.rx.isHighlighted).addDisposableTo(disposeBag)

Example of use

viewModel.isShowResult.not().drive(tipButton.rx.isHighlighted).addDisposableTo(disposeBag)

Carthage build fail in release 2.4.0

I'm getting the following error with your latest release(2.4.0) using Carthage.
it was working before so I have to explicitly specify 2.2.0 in my Cartfile which is not my preferred way.

swift --version
Apple Swift version 3.1 (swiftlang-802.0.51 clang-802.0.41)
Target: x86_64-apple-macosx10.9

xcode 8.3.1

Building scheme "RxSwiftExt" in RxSwiftExt.xcworkspace
Build Failed
Task failed with exit code 65:
/usr/bin/xcrun xcodebuild -workspace PATH_TO_APP/Carthage/Checkouts/RxSwiftExt/RxSwiftExt.xcworkspace -scheme RxSwiftExt -configuration Release -derivedDataPath .../Library/Caches/org.carthage.CarthageKit/DerivedData/RxSwiftExt/2.4.0 -sdk iphoneos ONLY_ACTIVE_ARCH=NO BITCODE_GENERATION_MODE=bitcode CODE_SIGNING_REQUIRED=NO CODE_SIGN_IDENTITY= CARTHAGE=YES clean build

build errors are:

materialized+elements.swift:10:35: error: use of undeclared type 'EventConvertible'
extension ObservableType where E: EventConvertible {
^~~~~~~~~~~~~~~~

aterialized+elements.swift:16:41: error: 'ElementType' is not a member type of 'Self.E'
public func elements() -> Observable<E.ElementType> {
~ ^

materialized+elements.swift:26:34: error: ambiguous reference to member '!='
return filter { $0.event.error != nil }
^~
Swift.!=:3:13: note: found this candidate
public func !=(t0: Any.Type?, t1: Any.Type?) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: UInt8, rhs: UInt8) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: Int8, rhs: Int8) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: UInt16, rhs: UInt16) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: Int16, rhs: Int16) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: UInt32, rhs: UInt32) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: Int32, rhs: Int32) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: UInt64, rhs: UInt64) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: Int64, rhs: Int64) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: UInt, rhs: UInt) -> Bool
^
Swift.!=:5:13: note: found this candidate
public func !=(lhs: Int, rhs: Int) -> Bool
^
Swift.!=:2:13: note: found this candidate
public func !=(lhs: ContiguousArray, rhs: ContiguousArray) -> Bool where Element : Equatable
^
Swift.!=:2:13: note: found this candidate
public func !=(lhs: ArraySlice, rhs: ArraySlice) -> Bool where Element : Equatable
^
Swift.!=:2:13: note: found this candidate
public func !=(lhs: [Element], rhs: [Element]) -> Bool where Element : Equatable
^
Swift.!=:6:13: note: found this candidate
public func !=(lhs: T, rhs: T) -> Bool where T : RawRepresentable, T.RawValue : Equatable
^
Swift.!=:6:13: note: found this candidate
public func !=(lhs: T, rhs: T) -> Bool where T : Equatable, T : RawRepresentable, T.RawValue : Equatable
^
Swift.!=:12:13: note: found this candidate
public func !=(lhs: T, rhs: T) -> Bool where T : Equatable
^
Swift.!=:34:13: note: found this candidate
public func !=(lhs: T?, rhs: T?) -> Bool where T : Equatable
^
Swift.!=:22:13: note: found this candidate
public func !=(lhs: T?, rhs: _OptionalNilComparisonType) -> Bool
^
Swift.!=:22:13: note: found this candidate
public func !=(lhs: _OptionalNilComparisonType, rhs: T?) -> Bool
^
Swift.!=:20:13: note: found this candidate
public func !=<A, B>(lhs: (A, B), rhs: (A, B)) -> Bool where A : Equatable, B : Equatable
^
Swift.!=:20:13: note: found this candidate
public func !=<A, B, C>(lhs: (A, B, C), rhs: (A, B, C)) -> Bool where A : Equatable, B : Equatable, C : Equatable
^
Swift.!=:20:13: note: found this candidate
public func !=<A, B, C, D>(lhs: (A, B, C, D), rhs: (A, B, C, D)) -> Bool where A : Equatable, B : Equatable, C : Equatable, D : Equatable
^
Swift.!=:20:13: note: found this candidate
public func !=<A, B, C, D, E>(lhs: (A, B, C, D, E), rhs: (A, B, C, D, E)) -> Bool where A : Equatable, B : Equatable, C : Equatable, D : Equatable, E : Equatable
^
Swift.!=:20:13: note: found this candidate
public func !=<A, B, C, D, E, F>(lhs: (A, B, C, D, E, F), rhs: (A, B, C, D, E, F)) -> Bool where A : Equatable, B : Equatable, C : Equatable, D : Equatable, E : Equatable, F : Equatable
^
Swift.LazyFilterIndex:3:24: note: found this candidate
public static func !=(lhs: LazyFilterIndex, rhs: LazyFilterIndex) -> Bool
^
Swift.Dictionary<Key, Value>:3:24: note: found this candidate
public static func !=(lhs: [Key : Value], rhs: [Key : Value]) -> Bool
^

** BUILD FAILED **

The following build commands failed:
CompileSwift normal arm64
CompileSwiftSources normal arm64 com.apple.xcode.tools.swift.compiler
(2 failures)

combineLatest(o1: Observable, o2: Observable)

Name and description

combineLatest(o1, o2) takes in two ObservableType types of types Observable and Observable and combines them into a single ObservableType<(T, U)>

Motivation for inclusion

I found myself (few times already) producing a combination of several inputs in my app's UI, e.g. a search bar and a toggle bar item. However I always have few bindings to bind that input to so I end up creating a combined observable that shares replay and then subscribe multiple times to it

I think it's gonna be easier to be able to avoid providing a closure to combineLatest and just bundle together all the inputs, so one can subscribe multiple times afterwards

PS: It might be nicer to allow the existing combineLatest operator to take a nil parameter for its closure parameter and in that case just bundle up the inputs as a tuple but that would mean a change in RxSwift itself vs. a custom operator in RxSwift-Ext

Maybe @kzaher could chime in with some argumentation if that'd make for a main repo patch or it doesn't make sense to?

Example of use

let input = Observable.combineLatest(searchBar.rx_text, segmented.rx_value).shareReplay(1)

input.
   .map {input in NSURL(string: "...blah blah blah use input blah")!}
   .flatMapLatest {url in
      NSURLSession.sharedSession().rx_JSON(url)
   } ... etc. etc.

input.
   .subscribeNext {search, selectedIndex in 
       ... refresh table view blah blah blah ...
   }

Refine unwrap

unwrap was included while we were drafting the repo structure as an example - I got to:

  1. add more tests - test for values, edge cases
  2. have another look at the code
  3. refine the playground example

I think it's important for this one to is the way we want the code to look lile since this is the first piece of code and others who PR would look at it for guidance how to structure their stuff (maybe?)

catchErrorJustComplete

Name and description

catchErrorJustComplete completes sequence if it fails with error

Motivation for inclusion

  • improves declarative nature of Rx, so you don't need to write every time:

`catchError { return Observable.empty() }``

Example of use

    someObservable.catchErrorJustComplete

I remember that it has been proposed in slack by dpaschich . So he probably the one who would like to port it :)

limitRateOrDistinct

This is the result of a chat in the Slack channel.

I was looking for a way to turn

4-4-4-------------4-4-----------4-5 into 4-4-4-5

which is something in between distinct and debounce.

Here is the suggested solution (which is untested yet):

extension ObservableType where E: Equatable {
    public func limitRateOrDistinct(rate: Double, scheduler: SchedulerType) -> Observable<E>{
        var lastEmit: (item: E, time: NSDate)?
        return self.flatMap { (newItem: E) -> Observable<E> in
            guard let lastEmit = lastEmit else { return Observable.just(newItem) }
            return scheduler.now.timeIntervalSinceDate(lastEmit.time) > rate
                || lastEmit.item != newItem ? .just(newItem) : .empty()
            }
            .doOnNext { emitted in
                lastEmit = (emitted, scheduler.now)
            }
    }
}

ignore(element ...)

Name of the operator

ignore(element ...)

Motivation for inclusion

This operator allows ignore one or more specific elements emitted by the source sequence. The idea is to have the operator take a variable number of parameters (all of the element type) or an array.

The only constraint is that elements must conform to the Equatable protocol.

Example of usage

  [1,2,3,4,5,6,7,8,9,10].ignore(5,6,7).subscribeNext { print($0) }
  // prints all but 5,6,7 items

weak map oprator

Name and description

map(weak:selector:) is convenience map oprator.

public extension ObservableType {
  public func map<A: AnyObject, R>(weak obj: A, selector: @escaping (A, Self.E) throws -> R) -> Observable<R> {
    return flatMap { [weak obj] (value) -> Observable<R> in
      obj.map { (obj) in
        self.map({ _ in try selector(obj, value) })
      } ?? .empty()
    }
  }
}

Motivation for inclusion

When using self in the map operator, we need to write a [weak self] clause every time.
like this,

button.rx.tap.map({ [weak self] _ in self?.textView.text }).bind(to: publishText).disposed(by: bag)

Example of use

button.rx.tap.map(weak: textView, selector: { $0.text }).bind(to: publishText).disposed(by: bag)

Observable.once

Name of the operator

Observable.once() or Observable.justOnce()

Motivation for inclusion

The Observable.just() function creates a sequence that delivers a single value to its observers, then completes. In some cases you want an observable to deliver a value once and exactly once.

For example it may be a one-time location value for setting up the initial position of a map: you want the initial position to be forced the first time the map is displayed, but not subsequent times after the user has moved the map.

Example of use

class MyViewController : UIViewController {
  @IBOutlet weak var mapView : MKMapView!

  let initialMapCenter : Observable<CLLocation> = Observable.justOnce(someLocation)

  func viewDidAppear(animated : Bool) {
    super.viewDidAppear(animated)
    _ = initialMapCenter.subscribeNext { [weak self] location in
      self?.mapView.setCenterCoordinate(location.coordinate, animated: true)
    }
  }
}

Meaningful transformation into Driver trait (JustComplete & IgnoreErrors)

Adding .justComplete & .ignoreError for Driver transformation

Motivation for inclusion

I think it is a good practice to use Driver as an output of every ViewModel instead of the Observable type. It means you have to handle what to do with error events. Two most popular ways would be:

  1. Provide "default value" with .asDriver(onErrorJustReturn:<type>)
  2. To "ignore" errors with .asDriver(onErrorDriveWith: .never()) or .asDriver(onErrorDriveWith: .empty())

Sometimes it's hard (or it would be misleading) to provide default value for a sequence. The second possibility with onErrorDriveWith: doesn't describe the author's intention. By reading it we have to do few implications in our heads e.g.: `if .never() then read as "ignore", if .empty() then read as "just complete".

What I want to propose is to add a small syntactic sugar for this "brain-process"

asDriver(onError fallbackStrategy: ErrorFallbackStrategy)

Example of use

let outputDriver = theObservable.asDriver(onError: .ignoreErrors)
//which is equal to:
//let outputDriver = theObservable.asDriver(onError: .never())

or

let outputDriver = theObservable.asDriver(onError: .justComplete)
//which is equal to:
//let outputDriver = theObservable.asDriver(onError: .empty())

I think the proposed operator describes more clearly what we want to achieve :)

Adding mapElements operator

Adding mapElements operator

In every project where I use RxSwift I always end up with adding small helper for mapping elements inside the array in Observable<[Type]>. Maybe it's time to include it into one library with common, additional Rx operators ;)

Motivation for inclusion

My motivation is simple. This... :

theObservable
  .mapElements(TheModel.init)

is more readable than... :

theObservable
 .map { array: in 
   return array.map(TheModel.init)
}

Example of use

Example can be found just above ;)

bindErrorTo

Name and description

bindErrorTo bind error from Observable to an Observer<ErrorType>.
This is my current implementation:

extension ObservableType {
    @warn_unused_result(message="http://git.io/rxs.uo")
    func bindErrorTo<O: ObserverType where O.E == ErrorType>(observer: O) -> Observable<E> {
        return doOnError { observer.onNext($0) }
    }
}

Motivation for inclusion

When implement MVVM, especial with UIBinding (like RxDataSources), my ViewModel often has error: PublishSubject<ErrorType> to handle Error. So when i make some logics, request data from network... i need to inject side effects by calling doOnError:

class SomeViewModel {
    private let errorSubject: PublishSubject<ErrorType> = PublishSubject<ErrorType>()
    var errors: Observable<ErrorType> { return errorSubject.asObservable() }

// Request data
 didBecomeActive
            .flatMapLatest { [unowned self] _ in
                return self.getProfileDetail()
                    .doOnError { self.errorSubject.onNext($0) }
                    .catchErrorJustComplete()
            }
}

Example of use

With convenient operator:

// Request data
 didBecomeActive
            .flatMapLatest { [unowned self] _ in
                return self.getProfileDetail()
                    .bindErrorTo(self.errorSubject)
                    .catchErrorJustComplete()
            }
}

Min & Max

Name and description

Min, Max

Propagates min or max element of the observable sequence based on comparison closure

Motivation for inclusion

Sometimes it useful to determine min or max element of the sequence

Proposed implementation

extension ObservableType {
   func edgeElement(isOrderedBefore: (E, E) -> Bool) -> Observable<E> {
      return reduce(Optional<E>.None) { (acum, element) in
         if let current = acum where isOrderedBefore(element, current) {
            return Optional.Some(element)
         }
         if let current = acum where isOrderedBefore(current, element) {
            return acum
         }
         return Optional.Some(element)
         }
         .flatMap { element -> Observable<E> in
            if let some = element {
               return Observable.just(some)
            }
            return Observable.empty()
      }
   }
}

extension ObservableType where E: Comparable {
   func max() -> Observable<E> {
      return edgeElement(>)
   }

   func min() -> Observable<E> {
      return edgeElement(<)
   }
}

apply operator

Name and description

I'd like to propose a very simple operator, apply, which takes a transformation function that takes an Observable and returns an Observable, and simply runs it and returns its value:

extension ObservableType {
    func apply<T>(_ transform: (Observable<Self.E>) -> Observable<T>) -> Observable<T> {
        return transform(self.asObservable())
    }
}

Motivation for inclusion

It's idiomatic to write new operators as extensions of ObservableType. This makes sense for many custom operators because it preserves the chaining syntax of RxSwift. But sometimes this style of extension operator is uncomfortable when the operator is not generic in nature, but might perform some combination of application-specific actions, and may even have side-effects.

In these cases it makes sense to write a simple function to add operations to an Observable, for example:

// Take an ordinary Rx-style request and add retry, application-specific side-effect, and error parsing.
func requestPolicy(_ request: Observable<Void>) -> Observable<Response> {
    return request.retry(maxAttempts)
        .do(onNext: sideEffect)
        .map { Response.success }
        .catchError { error in Observable.just(parseRequestError(error: error)) }

This function can then be applied to several requests to apply consistent retries, side-effects, and error handling:

let request1 = Observable<Void>.create { ... }
let request2 = Observable<Void>.create { ... } 
let request1Resilient = requestPolicy(request1)
let request2Resilient = requestPolicy(request2)

But this syntax is awkward because it requires each policy to be wrapped around the observable. The use of apply returns this to a more familiar chaining syntax:

let request1Resilient = request1.apply(requestPolicy)
let request2Resilient = request2.apply(requestPolicy)

This is especially useful when composing multiple transform functions:

// Without apply
let multiplePolicy = policy3(policy2(policy1(request)))
// With apply
let multiplePolicy = request
  .apply(policy1)
  .apply(policy2)
  .apply(policy3)

There may also be a desire to have a version with one or more arguments to apply:

    func apply<A, T>(arg: A, _ transform: (A, Observable<Self.E>) -> Observable<T>) -> Observable<T> {
        return transform(arg, self.asObservable())
    }

Example of use

See above.

throttledSample

Name of the operator

extension ObservableType {
  public func throttledSample(interval: RxTimeInterval, pauseInterval: RxTimeInterval = 0, scheduler: SchedulerType)
    -> Observable<E>
}

This operator will deliver the latest element of the source sequence at a specified interval. If the source sequence keeps sending out new elements, you are guaranteed to receive one every interval seconds.

If the source sequence stops sending elements:

  • if pauseInterval is specified and > 0, send the latest element after pauseInterval seconds
  • if pauseInterval is not specified or == 0, the latest element will be sent after interval seconds

If no element is sent during interval seconds, throttledSample will send the first element that comes after this delay, then reset its internal timer and deliver the next latest element after minimumInterval seconds at the latest.

Motivation for inclusion

This operator is what throttle should have been right from the start. The standard throttle operator won't deliver anything if the source sequence keeps sending new elements at intervals less than the throttle interval. What you usually want (particularly when handling UI) is a regular sample of the latest value, but no sample when there was no new value.

In some cases, for example (and typically) when processing text typed in a text field to deliver search results, you want to receive the latest typed text faster than interval seconds when the user stops typing. See below for details.

This operator is more complex than the average but allows modelling a finely tuned delivery of sequence elements over time.

Example of usage

In the context of throttling a textfield to provide search results, the ideal behavior is the following:

  • when the user starts typing, deliver a first sample immediately
  • while the user keeps typing, update the search results every 2 seconds, that is enough
  • when the user stops or pauses typing, update the search results quickly after 0.50 seconds

This behavior is what a user would expect from a search text field and can be modeled as follows:

let latestResults : Variable<[SearchResult]> = Variable([])

self.textField.rx_text
   .throttledSample(2.0, pauseInterval: 0.3, scheduler: networkRequestsScheduler)
   .flatMapLatest { getSearchResults($0) }
   .asDriver(onErrorJustReturn: [])
   .drive(latestResults)

mapTo(Any)

replaceWith(AnyObject)

maps any value to a constant

func replaceWith<R>(value: R) -> Observable<R> {
        return map { _ in value }
    }

Motivation for inclusion

Sometimes I need to just replace whatever value an Observable emits with a given constant, often times I'll do that for buttons:

[btnPlay.rx_tap.map({_ in true}), btnStop.rx_tap.map({_ in false})].toObservable()

Example of usage

[btnPlay.rx_tap.replaceWith(true), btnStop.rx_tap.replaceWith(false)].toObservable()

pausable operator

Name and description

pausable - pauses emission of values.

Motivation for inclusion

Provides a simple way to filter one observable based on the current state of another. Can be composed of withLatest and filter, but having a specific operator improves readability.

Example of use

let thingThatNeedsTheSpeaker: Observable<Thing>.create { ... }
let noCallInProgress = Observable<Bool>.create { /* wrapper around CoreTelephony.CTCallCenter() */ }

let validThings = thingThatNeedsTheSpeaker.pausable(noCallInProgress)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.