自定义Operator是整个Combine教程中难度最高的内容,因为它连接了Publisher和Subscriber,起到了一个中间桥梁的作用。
那么难点在哪里呢?我希望读者朋友能够带着下边3个问题来仔细读这篇文章:
- 如何接收上游Publisher的数据?
- 下游可能是Operator,也可能是Subscriber,如何处理这种情况?
- 当下游是Subscriber时,如何接收其请求,并传播给上游?
上边3个问题就是本文的核心,下边的讲解的代码来自CombineExt
查看全部Combine教程,请访问:FuckingSwiftUI
最简单的自定义Operator
所谓的组合就是指使用已有的Publisher和Operator组合成具有新功能的Operator,举个例子:
public extension Publisher where Output: Collection {
func mapMany<Result>(_ transform: @escaping (Output.Element) -> Result) -> Publishers.Map<Self, [Result]> {
map { $0.map(transform) }
}
}
上边代码中的.mapMany()
就是通过组合生成的一个新的Operator,它的用法如下:
let intArrayPublisher = PassthroughSubject<[Int], Never>()
intArrayPublisher
.mapMany(String.init)
.sink(receiveValue: { print($0) })
intArrayPublisher.send([10, 2, 2, 4, 3, 8])
// Output: ["10", "2", "2", "4", "3", "8"]
可以看出,.mapMany()
的功能就是按照给出的规则映射Collection中的所有元素,上边的代码是非常简单的,我们可以模仿这种模式来组合生成任何其他的Operator。
有意思的一点是,.mapMany()
输出类型通过代码public extension Publisher where Output: Collection
约束成了Collection
。也就是说该Operator的输入数据必须是Collection。
当然,大多数情况下没必要像上边这样写代码,这个看个人的喜好,上边的代码与下边的代码等价:
let intArrayPublisher = PassthroughSubject<[Int], Never>()
cancellable = intArrayPublisher
.map {
$0.map { String($0) }
}
.sink(receiveValue: { print($0) })
intArrayPublisher.send([10, 2, 2, 4, 3, 8])
完全自定义Operator
我们将会使用CombineExt中的amb
来演示如何自定义Operator,要想弄明白本文的内容,前提条件是对Combine有一定的了解,对CombineExt有一定的研究,迫切想知道如何自定义Operator。再回到amb
,它是一个非常有意思的Operator,我们先看看它的用法:
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
subject1
.amb(subject2)
.sink(receiveCompletion: { print("amb: completed with \($0)") },
receiveValue: { print("amb: \($0)") })
subject2.send(3) // Since this subject emit first, it becomes the active publisher
subject1.send(1)
subject2.send(6)
subject1.send(8)
subject1.send(7)
subject1.send(completion: .finished)
// Only when subject2 finishes, amb itself finishes as well, since it's the active publisher
subject2.send(completion: .finished)
打印结果:
amb: 3
amb: 6
amb: completed with .finished
从上边的代码可以看出,subject1
和subject2
谁先发送数据谁就会被激活,另一个则被忽略,这种行为很像是淘汰赛,只有第一名才会被保留。
这个Operator特别适合讲解如何自定义Operator,因为它的用法不算复杂,接下来我们就进入正题。
要想讲述清楚amb
的创作过程,我们需要反向推演,我们先看看当我们调用了下边代码后,是怎样的一个过程:
subject1
.amb(subject2)
public extension Publisher {
func amb<Other: Publisher>(_ other: Other)
-> Publishers.Amb<Self, Other> where Other.Output == Output, Other.Failure == Failure {
Publishers.Amb(first: self, second: other)
}
}
从上边的代码中,我们可以分析出以下几点信息:
amb()
函数的入参必须是一个Publisher,这算是一个约束条件amb()
函数的返回值是Publishers.Amb
,同样也是一个Publisher,后边给出的约束条件约束了这两个Publisher的输入和输出类型必须相同
从上边的代码可以看出,所谓的Operator就是Publisher协议的一个extension,因此我们能够获取到当前的Publisher,然后这个函数中需要返回一个Publisher,这样就实现了链式调用。
因此,现在的问题指向了Publishers.Amb
,我们需要解决的问题是:如何处理上边提到的淘汰逻辑?如何响应Subscriber的订阅和请求?
我们看看Publishers.Amb
的代码:
public extension Publishers {
struct Amb<First: Publisher, Second: Publisher>: Publisher where First.Output == Second.Output, First.Failure == Second.Failure {
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(first: first,
second: second,
downstream: subscriber))
}
public typealias Output = First.Output
public typealias Failure = First.Failure
private let first: First
private let second: Second
public init(first: First,
second: Second) {
self.first = first
self.second = second
}
}
}
代码看起来非常简单,只是持有了这2个Publisher,由于Amb
实现了Publisher协议,那么重点就在于如何处理订阅的逻辑了:
Subscription(first: first,
second: second,
downstream: subscriber)
在以前的文章中,我们提到过,Subscription是沟通Publisher和Subscriber的一座桥梁,因此,这个Subscription里边的逻辑就显得非常重要。
我们看看它的代码:
private extension Publishers.Amb {
class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
private var firstSink: Sink<First, Downstream>?
private var secondSink: Sink<Second, Downstream>?
private var preDecisionDemand = Subscribers.Demand.none
private var decision: Decision? {
didSet {
guard let decision = decision else { return }
switch decision {
case .first:
secondSink = nil
case .second:
firstSink = nil
}
request(preDecisionDemand)
preDecisionDemand = .none
}
}
init(first: First,
second: Second,
downstream: Downstream) {
self.firstSink = Sink(upstream: first,
downstream: downstream) { [weak self] in
guard let self = self,
self.decision == nil else { return }
self.decision = .first
}
self.secondSink = Sink(upstream: second,
downstream: downstream) { [weak self] in
guard let self = self,
self.decision == nil else { return }
self.decision = .second
}
}
func request(_ demand: Subscribers.Demand) {
guard decision != nil else {
preDecisionDemand += demand
return
}
firstSink?.demand(demand)
secondSink?.demand(demand)
}
func cancel() {
firstSink = nil
secondSink = nil
}
}
}
上边的代码比较长,我们拆分一下,我们先看初始化方法:
init(first: First,
second: Second,
downstream: Downstream) {
self.firstSink = Sink(upstream: first,
downstream: downstream) { [weak self] in
guard let self = self,
self.decision == nil else { return }
self.decision = .first
}
self.secondSink = Sink(upstream: second,
downstream: downstream) { [weak self] in
guard let self = self,
self.decision == nil else { return }
self.decision = .second
}
}
downstream在这里就是Subscriber,Sink
我们先别管,下边会解释,现在只需要把它当作一个新的桥梁,它能够连接Publisher和Subscriber。
上边firstSink的Sink
初始化函数中的闭包的调用时机是: 当第一次收到first这个Publisher的事件时调用,不管是收到数据还是收到完成事件,这个我们在后续讲解Sink
的时候会讲解。
同理,secondSink跟firstSink差不多,在上边的初始化函数中,我们就找到了上边第一个问题的答案,当第一次收到first或second的事件后,就为decision赋值了,decision是一个enum,因此他是可以区分是first还是second。
private enum Decision {
case first
case second
}
到目前为止,大家应该仍然是糊涂的,因为大家对Sink还不是很了解, 我们必须先把这个Sink讲解了才能继续下去:
class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
typealias TransformFailure = (Upstream.Failure) -> Downstream.Failure?
typealias TransformOutput = (Upstream.Output) -> Downstream.Input?
private(set) var buffer: DemandBuffer<Downstream>
private var upstreamSubscription: Subscription?
private let transformOutput: TransformOutput?
private let transformFailure: TransformFailure?
init(upstream: Upstream,
downstream: Downstream,
transformOutput: TransformOutput? = nil,
transformFailure: TransformFailure? = nil) {
self.buffer = DemandBuffer(subscriber: downstream)
self.transformOutput = transformOutput
self.transformFailure = transformFailure
upstream.subscribe(self)
}
func demand(_ demand: Subscribers.Demand) {
let newDemand = buffer.demand(demand)
upstreamSubscription?.requestIfNeeded(newDemand)
}
func receive(subscription: Subscription) {
upstreamSubscription = subscription
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
...
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
...
}
func cancelUpstream() {
upstreamSubscription.kill()
}
deinit { cancelUpstream() }
}
我省略了一些不重要的代码,我们仔细分析下上边的代码:
Sink
实现了Subscriber
协议,这说明了它本身就是一个订阅者,通常我们用它订阅upstream,这么做的目的是方便操作upstream输出的数据和request。DemandBuffer
我们之前的文章已经讲过了,它做数据管理,只复杂把数据发送给downstreamtransformOutput
和transformFailure
数据转换函数,我们这里不讲了
Sink的核心思想就是通过亲自订阅上游的Publisher来接收数据和事件,通过DemandBuffer来管理这些数据和事件,当需要时,发送给下游的订阅者。
上边Sink的设计很重要,它是一个中间过程,本质上是因为它本身就是一个Subscriber订阅者,因此不仅能够获取到上游的数据,还剩自己控制发送rquest。
过程讲解
我们再重复一遍这个过程,先看下图:
当执行下边代码时,究竟发生了什么?
subject1
.amb(subject2)
.sink(receiveCompletion: { print("amb: completed with \($0)") },
receiveValue: { print("amb: \($0)") })
subject1就是上图中的Publisher,.amb()
返回了上图中的Amb, 当上边代码中调用了.sink()
后,Amb
就收到了订阅,会调用下边的代码:
public extension Publishers {
struct Amb<First: Publisher, Second: Publisher>: Publisher where First.Output == Second.Output, First.Failure == Second.Failure {
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(first: first,
second: second,
downstream: subscriber))
}
}
}
当收到订阅后,需要返回一个subscription,也就是订阅凭证,因为后边的Subscriber需要使用这个凭证来发送请求或者取消pipline。
由于上图中绿色的.sink()
是系统方法,我们无法看到实现,但是,我们知道,当.sink()
收到订阅凭证后就会发送request,也就是上图中的紫色虚线。
请注意,Amb
里边的内容完全是我们自定义的,所以我们能够完全控制,当收到.sink()
的request后,会调用Subscription
下边的函数:
private extension Publishers.Amb {
class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
...
func request(_ demand: Subscribers.Demand) {
guard decision != nil else {
preDecisionDemand += demand
return
}
firstSink?.demand(demand)
secondSink?.demand(demand)
}
...
}
}
.sink()
传过来的demand的值是.unlimited
,表示不限制数据的接收个数,观察上边的代码,decision
表示当前使用的Publisher是哪个,subject1或者subject2谁第一个发送数据,decision就指向谁。
由于这个request是收到订阅凭证后立刻发出的,这时候subject1和subject2都没有发送数据,因此decision
为nil,上边的代码就把.sink()
传过来的demand保存在preDecisionDemand属性中了,后边会把这个demand透传给胜出的Publisher(subject1或subject2)。
那么重点来了,subject1,subject2竞争的代码在什么地方呢?答案是放在了上边Subscription初始化方法中了:
private extension Publishers.Amb {
class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
private var firstSink: Sink<First, Downstream>?
private var secondSink: Sink<Second, Downstream>?
private var preDecisionDemand = Subscribers.Demand.none
private var decision: Decision? {
didSet {
guard let decision = decision else { return }
switch decision {
case .first:
secondSink = nil
case .second:
firstSink = nil
}
request(preDecisionDemand)
preDecisionDemand = .none
}
}
init(first: First,
second: Second,
downstream: Downstream) {
self.firstSink = Sink(upstream: first,
downstream: downstream) { [weak self] in
guard let self = self,
self.decision == nil else { return }
self.decision = .first
}
self.secondSink = Sink(upstream: second,
downstream: downstream) { [weak self] in
guard let self = self,
self.decision == nil else { return }
self.decision = .second
}
}
...
}
}
还记得Subscription什么时候初始化吗?就是当收到.sink()
的订阅后创建的。上边的init()
很简单,分别创建了两个Sink,firstSink
代表subject1,secondSink
代表subject2。
在上边的小节中,我们已经知道,Sink的闭包参数的调用时机是当收到第一个参数时调用,再结合上边的代码,我们就可以看出,当firstSink或者secondSink其中一个第一次收到数据后,就决定了decision的值,并且在decision的didSet
中,这时候就选中了哪个Publisher作为发送数据的Publisher,另一个则赋值为nil,之后我们重新调用了request(preDecisionDemand)
,把之前保存的demand透传给胜出的Publisher。
此时此刻,我们的头脑中应该有两个疑问:
- 假设firstSink胜出了,那么调用
firstSink?.demand(demand)
是如何实现把demand透传subject1的? - Sink是如何接收subject1或者subject2的数据的?
这两个问题的核心都指向了Sink,注意,这个Sink很有意思,本文的最上边也讲到了,它实现了Subscriber
协议,这一点很重要,我们看看它的初始化方法中干了啥?
init(upstream: Upstream,
downstream: Downstream,
transformOutput: TransformOutput? = nil,
transformFailure: TransformFailure? = nil) {
self.buffer = DemandBuffer(subscriber: downstream)
self.transformOutput = transformOutput
self.transformFailure = transformFailure
upstream.subscribe(self)
}
看明白了吗?由于Sink本身就是一个Subscriber,因此,它订阅了传进来的上游Publisher。
func receive(subscription: Subscription) {
upstreamSubscription = subscription
}
并且能够拿到上游Publisher传过来的subscription,因此可以使用这个subscription发送request。
到此为止,上边的两个问题的答案已经呼之欲出了。
总结一下,Amb
中自定义的Subscription
作为沟通下游.sink()
的桥梁接收request,Subscription
中持有的Sink
订阅了上游的Publisher,它作为Publisher和.sink()
的中间桥梁,透传demand和数据。
那么回到开头的3个问题,你有答案了吗?
- 如何接收上游PubLisher的数据?
- 下游可能是Publisher或者其他Operator,也可能是Subscriber,如何处理这种情况?
- 当下游是Subscriber时,如何接收其请求,并传播给上游?
总结
当初特别好奇,Combine中的Operator是如何实现的?因为它确实比较特殊,它的上游是Publisher或者Operator,下游是Operator或Subscriber。本文讲解的内容可以作为一个套路来学习,如果需要自定义Operator,可以参考这篇文章。