If .combineLatest() is used with Publishers which are recieved on multiple threads then the values are not always received.
This is probably a Combine issue and not an issue of this Library.
func testCollectionCombineLatestWithFinishedEvent_threads() {
for x in 0 ... 1000 {
print("--- iteration \(x)")
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()
let fourth = PassthroughSubject<Int, Never>()
let completedExp = expectation(description: "completed")
let values1234 = expectation(description: "1, 2, 3, 4")
let values12345234 = expectation(description: "1, 2, 3, 4, 5, 2, 3, 6")
var results = [[Int]]()
subscription = [first.receive(on: DispatchQueue.global(qos: .userInitiated)),
second.receive(on: DispatchQueue.global(qos: .userInteractive)),
third.receive(on: DispatchQueue.global(qos: .background)),
fourth.receive(on: DispatchQueue.global(qos: .utility))]
.combineLatest()
.sink(receiveCompletion: { _ in completedExp.fulfill() },
receiveValue: {
results.append($0)
if results == [[1, 2, 3, 4]] {
values1234.fulfill()
}
if results == [[1, 2, 3, 4], [5, 2, 3, 4]] {
values12345234.fulfill()
}
})
first.send(1)
second.send(2)
third.send(3)
fourth.send(4)
wait(for: [values1234], timeout: 5)
XCTAssertEqual(results, [[1, 2, 3, 4]])
first.send(5)
wait(for: [values12345234], timeout: 5)
first.send(completion: .finished)
[second, third, fourth].forEach {
$0.send(completion: .finished)
}
wait(for: [completedExp], timeout: 5)
}
}
func testCollectionCombineLatestWithFinishedEvent_threads_2() {
for x in 0 ... 1000 {
print("--- iteration \(x)")
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()
let fourth = PassthroughSubject<Int, Never>()
let completedExp = expectation(description: "completed")
let values1234 = expectation(description: "1, 2, 3, 4")
let values12345234 = expectation(description: "1, 2, 3, 4, 5, 2, 3, 6")
var results = [[Int]]()
subscription = [first, second, third, fourth]
.combineLatest()
.sink(receiveCompletion: { _ in completedExp.fulfill() },
receiveValue: {
results.append($0)
if results == [[1, 2, 3, 4]] {
values1234.fulfill()
}
if results == [[1, 2, 3, 4], [5, 2, 3, 4]] {
values12345234.fulfill()
}
})
DispatchQueue.global(qos: .background).async {
first.send(1)
}
DispatchQueue.global(qos: .userInteractive).async {
second.send(2)
}
DispatchQueue.global(qos: .utility).async {
third.send(3)
}
DispatchQueue.global(qos: .userInitiated).async {
fourth.send(4)
}
wait(for: [values1234], timeout: 5)
XCTAssertEqual(results, [[1, 2, 3, 4]])
DispatchQueue.global(qos: .userInitiated).async {
first.send(5)
}
wait(for: [values12345234], timeout: 5)
first.send(completion: .finished)
[second, third, fourth].forEach {
$0.send(completion: .finished)
}
wait(for: [completedExp], timeout: 5)
}
}