Intro
Combine is a framework made by Apple designed to support us in writing code that could be way more complex if written in an imperative way. It’s often said that with great power comes great responsibility. Therefore, as developers, it’s essential for us to understand how to harness it, so it does not backfire.
Today, we’re going to take a closer look 👀 at a few Combine operators, showcasing their practical application.
Operators
map
public func map<T>(
_ transform: @escaping (Self.Output) -> T
) -> Publishers.Map<Self, T>
In Combine, the map operator transforms each value from the upstream applying the provided transformation closure.
The definition might sound complex but let’s have a quick look how simple it is in practice ⤵️.
[1, 2, 3]
.publisher
.map { String("Number: \($0)") }
.sink(receiveValue: { print("RECEVIED VALUE: \($0)") })
.store(in: &cancellables)
Console output
RECEVIED VALUE: Number: 1 RECEVIED VALUE: Number: 2 RECEVIED VALUE: Number: 3
In the above example, we map Int
values received from the upstream to String
values and then we print them in the receiveValue
closure.
Easy, right? Let’s jump to flatMap
then!
flatMap
public func flatMap<T, P>(
maxPublishers: Subscribers.Demand = .unlimited,
_ transform: @escaping (Self.Output) -> P
) -> Publishers.FlatMap<P, Self> where T == P.Output, P : Publisher, Self.Failure == P.Failure
The flat map operator transforms each value from the upstream into a new Publisher applying the provided transformation closure.
Clear? Yass 🤯
Check out the image for a more detailed explanation ⤵️.
- (1) - events in a stream. The first emitted event is number 1
- (2) - stream completion
- (3) - streams
- (4) - each new upstream event triggers subscription to a new stream
- (5) - events observed by the
flatMap
subscriber - (6) - each of upstream events is republished to the down stream
Still unclear? I come to the rescue 🛟 with the code example ⤵️.
var cancellables = Set<AnyCancellable>()
func request(number: Int) -> AnyPublisher<Int, Never> { // 1
Just(number)
.delay(for: .seconds((1...2).randomElement()!), scheduler: DispatchQueue.main) // 2
.handleEvents(
receiveSubscription: { _ in print("REQUEST \(number) STARTED") },
receiveCompletion: {
switch $0 {
case .finished:
print("REQUEST: \(number) FINISHED")
case .failure:
fatalError()
}
},
receiveCancel: { print("REQUEST \(number) CANCELLED") }
)
.eraseToAnyPublisher()
}
Array([1, 2, 3]) // 3
.publisher // 4
.flatMap { number in request(number: number) } // 5
.sink( // 6
receiveCompletion: {
switch $0 {
case .finished:
print("STREAM FINISHED")
case .failure:
fatalError()
}
},
receiveValue: { value in print("RECEIVED VALUE \(value)") }
)
.store(in: &cancellables)
Console output
REQUEST 1 STARTED REQUEST 2 STARTED REQUEST 3 STARTED RECEIVED VALUE 1 REQUEST 1 FINISHED RECEIVED VALUE 3 REQUEST 3 FINISHED RECEIVED VALUE 2 REQUEST 2 FINISHED STREAM FINISHED
- // 1 -
request
function emits a single value (usingJust
publisher) representing the result of a faked API request. The result is published with a delay to simulate the real async request - // 2 -
delay
operator delaying the flow of elements through the stream for a given amount of time and publishing using a specified scheduler - // 3 - definition of static array of elements
- // 4 -
publisher
operator transforming a static array of elements into a stream of the array elements. Each element is then sent down the stream one by one (one event per each element) - // 5 -
flatMap
operator taking each element from the upstream and transforms it into a new stream. In our case, it takes each element of the array (// 3) and transforms it into a new stream using therequest
function (// 1) - // 6 -
sink
operator attaching the subscriber to the stream and observes elements published by it. In our case, it observes and prints out each received value and the stream completion
What can be a real life usage of flatMap
?
Imagine that you have a messaging app. To get a single thread with all messages you need to perform one request, so in case you have 10 threads, you perform 10 requests, to get each thread.
pseudocode ⤵️
getThreadsIDs() // 1
.flatMap(\.publisher)
.map(ThreadDetailsRequest.init) // 2
.flatMap { request in
apiService.execute(request: request) // 3
.catch { _ in Empty(completeImmediately: true) } // 4
}
.sink(receiveValue: { response in threadDetailsStorage.insert(response.threadDetails) }) // 5
.store(in: &cancellables)
- // 1 - stream that publishes the Array of threads IDs (
AnyPublisher<[String], Error>
) - // 2 - mapping ID (
String
) to a request model - // 3 - each request is converted into a new publisher that represents API request
- // 4 -
flatMap
does not ignore errors! In that casecatch
operator intercepts an error, and replaces it withEmpty
publisher. Empty publisher does not emit any error or element into downstream, but completes the stream. In case we don’t handle errors, the main stream would be completed with error and no more events would be observed - // 5 -
sink
operator that attaches subscriber to the stream and observe elements in the main stream. In our case we save fetched thread details into local cache
map + switchToLatest
public func switchToLatest() -> Publishers.SwitchToLatest<Self.Output, Self>
According to the docs ⤵️
Republishes elements sent by the most recently received publisher.
Check out the schema for more details ⤵️
(most of the elements are already described on the flatMap
schema 👀)
- (1) - when a new publisher arrives from the upstream,
switchToLatest
cancels the previous publisher subscription and subscribes to a new one
Code example ⤵️
var cancellables = Set<AnyCancellable>()
func request(number: Int) -> AnyPublisher<Int, Never> {
Just(number)
.delay(for: .seconds((1...2).randomElement()!), scheduler: DispatchQueue.main)
.handleEvents(
receiveSubscription: { _ in print("REQUEST \(number) STARTED") },
receiveCompletion: {
switch $0 {
case .finished:
print("REQUEST \(number) FINISHED")
case .failure:
fatalError()
}
},
receiveCancel: { print("REQUEST \(number) CANCELLED") }
)
.eraseToAnyPublisher()
}
Array([1, 2, 3])
.publisher
.map { number in request(number: number) } // 1
.switchToLatest() // 2
.sink(
receiveCompletion: {
switch $0 {
case .finished:
print("STREAM FINISHED")
case .failure:
fatalError()
}
},
receiveValue: { value in print("RECEIVED VALUE \(value)") }
)
.store(in: &cancellables)
Console output
REQUEST 1 STARTED REQUEST 1 CANCELLED REQUEST 2 STARTED REQUEST 2 CANCELLED REQUEST 3 STARTED RECEIVED VALUE 3 REQUEST 3 FINISHED STREAM FINISHED
- // 1 - conversion of each number to
AnyPublisher<Int, Never>
representing an API request - // 2 -
switchToLatest
subscribes to the latest publisher emitted from the upstream and cancels the previous subscription
Real-life use case for map + switchToLatest
pseudocode ⤵️
searchBar
.publisher(for: .text) // 1
.map(SearchRequest.init) // 2
.map { request in
apiService.execute(request: request) // 3
.catch { _ in Empty(completeImmediately: true) } // 4
}
.switchToLatest() // 5
.sink(receiveValue: { result in dataSource = result.searchResults }) // 6
.store(in: &cancellables)
- // 1 - publisher of elements from the search bar text field
- // 2 - each text element is mapped to the request model (
SearchRequest
) - // 3 - each request is converted into a new publisher representing the API request
- // 4 -
switchToLatest
does not ignore errors! In that casecatch
operator intercepts an error and replaces it withEmpty
publisher. Empty publisher does not emit any error or element downstream but completes the stream. In case errors are not handled, the mainstream would be completed with error and no more events would be observed - // 5 - subscribes to the latest publisher from the upstream and cancels the previous subscription
- // 6 -
sink
operator attaches the subscriber to the stream and observes elements in the mainstream. In our case, the data source is modified by assigning the request result.
What’s the difference comparing to the flatMap
?
flatMap
republishes all publisher’s events and map + switchToLatest
only the latest one (previous ones are cancelled).
What’s the benefit of using map + switchToLatest
?
Using map + switchToLatest
may help you with reducing API operations when the previous operation becomes redundant.
Final Takeaways 🧠
- Think about your use case and a stream behavior that you want to achieve.
flatMap
subscribes to each new publisher and lets them all republish elements to the downstream.map + switchToLatest
lets the latest publisher republish elements to the downstream, the rest of the streams are cancelled. It may help you with reducing redundant API requests.flatMap
,map + switchToLatest
do not ignore errors. In case any publisher fails it’ll end the mainstream and no more events will be published. To make sure it does not happen to you, remember about error handling.- When a stream is cancelled no more events will be published (including completion).
Thanks for reading. 📖
I hope you found it useful!
If you enjoy the topic don’t forget to follow me on one of my social media - LinkedIn, X, Mastodon, Bluesky or via RSS feed to keep up to speed. 🚀