Combine之自定义Operator

06/26/2022 13:12 下午 posted in  apple

https://github.com/agelessman/FuckingSwiftUI

自定义Operator是整个Combine教程中难度最高的内容,因为它连接了Publisher和Subscriber,起到了一个中间桥梁的作用。

那么难点在哪里呢?我希望读者朋友能够带着下边3个问题来仔细读这篇文章:

  • 如何接收上游Publisher的数据?
  • 下游可能是Operator,也可能是Subscriber,如何处理这种情况?
  • 当下游是Subscriber时,如何接收其请求,并传播给上游?

上边3个问题就是本文的核心,下边的讲解的代码来自CombineExt

查看全部Combine教程,请访问:FuckingSwiftUI

最简单的自定义Operator

所谓的组合就是指使用已有的Publisher和Operator组合成具有新功能的Operator,举个例子:

public extension Publisher where Output: Collection {

    func mapMany<Result>(_ transform: @escaping (Output.Element) -> Result) -> Publishers.Map<Self, [Result]> {
        map { $0.map(transform) }
    }
}

上边代码中的.mapMany()就是通过组合生成的一个新的Operator,它的用法如下:

let intArrayPublisher = PassthroughSubject<[Int], Never>()

intArrayPublisher
   .mapMany(String.init)
  .sink(receiveValue: { print($0) })

intArrayPublisher.send([10, 2, 2, 4, 3, 8])

// Output: ["10", "2", "2", "4", "3", "8"]

可以看出,.mapMany()的功能就是按照给出的规则映射Collection中的所有元素,上边的代码是非常简单的,我们可以模仿这种模式来组合生成任何其他的Operator。

有意思的一点是,.mapMany()输出类型通过代码public extension Publisher where Output: Collection约束成了Collection。也就是说该Operator的输入数据必须是Collection。

当然,大多数情况下没必要像上边这样写代码,这个看个人的喜好,上边的代码与下边的代码等价:

let intArrayPublisher = PassthroughSubject<[Int], Never>()

cancellable = intArrayPublisher
    .map {
        $0.map { String($0) }
    }
  .sink(receiveValue: { print($0) })

intArrayPublisher.send([10, 2, 2, 4, 3, 8])

完全自定义Operator

我们将会使用CombineExt中的amb来演示如何自定义Operator,要想弄明白本文的内容,前提条件是对Combine有一定的了解,对CombineExt有一定的研究,迫切想知道如何自定义Operator。再回到amb,它是一个非常有意思的Operator,我们先看看它的用法:

let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()

subject1
  .amb(subject2)
  .sink(receiveCompletion: { print("amb: completed with \($0)") },
        receiveValue: { print("amb: \($0)") })

subject2.send(3) // Since this subject emit first, it becomes the active publisher
subject1.send(1)
subject2.send(6)
subject1.send(8)
subject1.send(7)

subject1.send(completion: .finished)
// Only when subject2 finishes, amb itself finishes as well, since it's the active publisher
subject2.send(completion: .finished)

打印结果:

amb: 3
amb: 6
amb: completed with .finished

从上边的代码可以看出,subject1subject2谁先发送数据谁就会被激活,另一个则被忽略,这种行为很像是淘汰赛,只有第一名才会被保留。

这个Operator特别适合讲解如何自定义Operator,因为它的用法不算复杂,接下来我们就进入正题。

要想讲述清楚amb的创作过程,我们需要反向推演,我们先看看当我们调用了下边代码后,是怎样的一个过程:

subject1
  .amb(subject2)
public extension Publisher {
    func amb<Other: Publisher>(_ other: Other)
        -> Publishers.Amb<Self, Other> where Other.Output == Output, Other.Failure == Failure {
        Publishers.Amb(first: self, second: other)
    }
}

从上边的代码中,我们可以分析出以下几点信息:

  • amb()函数的入参必须是一个Publisher,这算是一个约束条件
  • amb()函数的返回值是Publishers.Amb,同样也是一个Publisher,后边给出的约束条件约束了这两个Publisher的输入和输出类型必须相同

从上边的代码可以看出,所谓的Operator就是Publisher协议的一个extension,因此我们能够获取到当前的Publisher,然后这个函数中需要返回一个Publisher,这样就实现了链式调用。

因此,现在的问题指向了Publishers.Amb,我们需要解决的问题是:如何处理上边提到的淘汰逻辑?如何响应Subscriber的订阅和请求?

我们看看Publishers.Amb的代码:

public extension Publishers {
    struct Amb<First: Publisher, Second: Publisher>: Publisher where First.Output == Second.Output, First.Failure == Second.Failure {
        public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
            subscriber.receive(subscription: Subscription(first: first,
                                                          second: second,
                                                          downstream: subscriber))
        }

        public typealias Output = First.Output
        public typealias Failure = First.Failure

        private let first: First
        private let second: Second

        public init(first: First,
                    second: Second) {
            self.first = first
            self.second = second
        }
    }
}

代码看起来非常简单,只是持有了这2个Publisher,由于Amb实现了Publisher协议,那么重点就在于如何处理订阅的逻辑了:

Subscription(first: first,
            second: second,
            downstream: subscriber)

在以前的文章中,我们提到过,Subscription是沟通Publisher和Subscriber的一座桥梁,因此,这个Subscription里边的逻辑就显得非常重要。

我们看看它的代码:

private extension Publishers.Amb {
    class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
        private var firstSink: Sink<First, Downstream>?
        private var secondSink: Sink<Second, Downstream>?
        private var preDecisionDemand = Subscribers.Demand.none
        private var decision: Decision? {
            didSet {
                guard let decision = decision else { return }
                switch decision {
                case .first:
                    secondSink = nil
                case .second:
                    firstSink = nil
                }

                request(preDecisionDemand)
                preDecisionDemand = .none
            }
        }

        init(first: First,
             second: Second,
             downstream: Downstream) {
            self.firstSink = Sink(upstream: first,
                                  downstream: downstream) { [weak self] in
                                guard let self = self,
                                      self.decision == nil else { return }

                                self.decision = .first
                             }

            self.secondSink = Sink(upstream: second,
                                   downstream: downstream) { [weak self] in
                                guard let self = self,
                                      self.decision == nil else { return }

                                self.decision = .second
                              }
        }

        func request(_ demand: Subscribers.Demand) {
            guard decision != nil else {
                preDecisionDemand += demand
                return
            }

            firstSink?.demand(demand)
            secondSink?.demand(demand)
        }

        func cancel() {
            firstSink = nil
            secondSink = nil
        }
    }
}

上边的代码比较长,我们拆分一下,我们先看初始化方法:

init(first: First,
     second: Second,
     downstream: Downstream) {
    self.firstSink = Sink(upstream: first,
                          downstream: downstream) { [weak self] in
                        guard let self = self,
                              self.decision == nil else { return }

                        self.decision = .first
                     }

    self.secondSink = Sink(upstream: second,
                           downstream: downstream) { [weak self] in
                        guard let self = self,
                              self.decision == nil else { return }

                        self.decision = .second
                      }
}

downstream在这里就是Subscriber,Sink我们先别管,下边会解释,现在只需要把它当作一个新的桥梁,它能够连接Publisher和Subscriber。

上边firstSink的Sink初始化函数中的闭包的调用时机是: 当第一次收到first这个Publisher的事件时调用,不管是收到数据还是收到完成事件,这个我们在后续讲解Sink的时候会讲解。

同理,secondSink跟firstSink差不多,在上边的初始化函数中,我们就找到了上边第一个问题的答案,当第一次收到first或second的事件后,就为decision赋值了,decision是一个enum,因此他是可以区分是first还是second。

private enum Decision {
    case first
    case second
}

到目前为止,大家应该仍然是糊涂的,因为大家对Sink还不是很了解, 我们必须先把这个Sink讲解了才能继续下去:

class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
    typealias TransformFailure = (Upstream.Failure) -> Downstream.Failure?
    typealias TransformOutput = (Upstream.Output) -> Downstream.Input?

    private(set) var buffer: DemandBuffer<Downstream>
    private var upstreamSubscription: Subscription?
    private let transformOutput: TransformOutput?
    private let transformFailure: TransformFailure?

    init(upstream: Upstream,
         downstream: Downstream,
         transformOutput: TransformOutput? = nil,
         transformFailure: TransformFailure? = nil) {
        self.buffer = DemandBuffer(subscriber: downstream)
        self.transformOutput = transformOutput
        self.transformFailure = transformFailure
        upstream.subscribe(self)
    }

    func demand(_ demand: Subscribers.Demand) {
        let newDemand = buffer.demand(demand)
        upstreamSubscription?.requestIfNeeded(newDemand)
    }

    func receive(subscription: Subscription) {
        upstreamSubscription = subscription
    }

    func receive(_ input: Upstream.Output) -> Subscribers.Demand {
        ...
    }

    func receive(completion: Subscribers.Completion<Upstream.Failure>) {
        ...
    }

    func cancelUpstream() {
        upstreamSubscription.kill()
    }

    deinit { cancelUpstream() }
}

我省略了一些不重要的代码,我们仔细分析下上边的代码:

  • Sink实现了Subscriber协议,这说明了它本身就是一个订阅者,通常我们用它订阅upstream,这么做的目的是方便操作upstream输出的数据和request。
  • DemandBuffer我们之前的文章已经讲过了,它做数据管理,只复杂把数据发送给downstream
  • transformOutputtransformFailure数据转换函数,我们这里不讲了

Sink的核心思想就是通过亲自订阅上游的Publisher来接收数据和事件,通过DemandBuffer来管理这些数据和事件,当需要时,发送给下游的订阅者。

上边Sink的设计很重要,它是一个中间过程,本质上是因为它本身就是一个Subscriber订阅者,因此不仅能够获取到上游的数据,还剩自己控制发送rquest。

过程讲解

我们再重复一遍这个过程,先看下图:

当执行下边代码时,究竟发生了什么?

subject1
  .amb(subject2)
  .sink(receiveCompletion: { print("amb: completed with \($0)") },
        receiveValue: { print("amb: \($0)") })

subject1就是上图中的Publisher,.amb()返回了上图中的Amb, 当上边代码中调用了.sink()后,Amb就收到了订阅,会调用下边的代码:

public extension Publishers {
    struct Amb<First: Publisher, Second: Publisher>: Publisher where First.Output == Second.Output, First.Failure == Second.Failure {
        public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
            subscriber.receive(subscription: Subscription(first: first,
                                                          second: second,
                                                          downstream: subscriber))
        }
    }
}

当收到订阅后,需要返回一个subscription,也就是订阅凭证,因为后边的Subscriber需要使用这个凭证来发送请求或者取消pipline。

由于上图中绿色的.sink()是系统方法,我们无法看到实现,但是,我们知道,当.sink()收到订阅凭证后就会发送request,也就是上图中的紫色虚线。

请注意,Amb里边的内容完全是我们自定义的,所以我们能够完全控制,当收到.sink()的request后,会调用Subscription下边的函数:

private extension Publishers.Amb {
    class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
        ...

        func request(_ demand: Subscribers.Demand) {
            guard decision != nil else {
                preDecisionDemand += demand
                return
            }

            firstSink?.demand(demand)
            secondSink?.demand(demand)
        }
                ...
    }
}

.sink()传过来的demand的值是.unlimited,表示不限制数据的接收个数,观察上边的代码,decision表示当前使用的Publisher是哪个,subject1或者subject2谁第一个发送数据,decision就指向谁。

由于这个request是收到订阅凭证后立刻发出的,这时候subject1和subject2都没有发送数据,因此decision为nil,上边的代码就把.sink()传过来的demand保存在preDecisionDemand属性中了,后边会把这个demand透传给胜出的Publisher(subject1或subject2)。

那么重点来了,subject1,subject2竞争的代码在什么地方呢?答案是放在了上边Subscription初始化方法中了:

private extension Publishers.Amb {
    class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
        private var firstSink: Sink<First, Downstream>?
        private var secondSink: Sink<Second, Downstream>?
        private var preDecisionDemand = Subscribers.Demand.none
        private var decision: Decision? {
            didSet {
                guard let decision = decision else { return }
                switch decision {
                case .first:
                    secondSink = nil
                case .second:
                    firstSink = nil
                }

                request(preDecisionDemand)
                preDecisionDemand = .none
            }
        }

        init(first: First,
             second: Second,
             downstream: Downstream) {
            self.firstSink = Sink(upstream: first,
                                  downstream: downstream) { [weak self] in
                                guard let self = self,
                                      self.decision == nil else { return }

                                self.decision = .first
                             }

            self.secondSink = Sink(upstream: second,
                                   downstream: downstream) { [weak self] in
                                guard let self = self,
                                      self.decision == nil else { return }

                                self.decision = .second
                              }
        }

        ...
    }
}

还记得Subscription什么时候初始化吗?就是当收到.sink()的订阅后创建的。上边的init()很简单,分别创建了两个Sink,firstSink代表subject1,secondSink代表subject2。

在上边的小节中,我们已经知道,Sink的闭包参数的调用时机是当收到第一个参数时调用,再结合上边的代码,我们就可以看出,当firstSink或者secondSink其中一个第一次收到数据后,就决定了decision的值,并且在decision的didSet中,这时候就选中了哪个Publisher作为发送数据的Publisher,另一个则赋值为nil,之后我们重新调用了request(preDecisionDemand),把之前保存的demand透传给胜出的Publisher。

此时此刻,我们的头脑中应该有两个疑问:

  1. 假设firstSink胜出了,那么调用firstSink?.demand(demand)是如何实现把demand透传subject1的?
  2. Sink是如何接收subject1或者subject2的数据的?

这两个问题的核心都指向了Sink,注意,这个Sink很有意思,本文的最上边也讲到了,它实现了Subscriber协议,这一点很重要,我们看看它的初始化方法中干了啥?

init(upstream: Upstream,
     downstream: Downstream,
     transformOutput: TransformOutput? = nil,
     transformFailure: TransformFailure? = nil) {
    self.buffer = DemandBuffer(subscriber: downstream)
    self.transformOutput = transformOutput
    self.transformFailure = transformFailure
    upstream.subscribe(self)
}

看明白了吗?由于Sink本身就是一个Subscriber,因此,它订阅了传进来的上游Publisher。

func receive(subscription: Subscription) {
    upstreamSubscription = subscription
}

并且能够拿到上游Publisher传过来的subscription,因此可以使用这个subscription发送request。

到此为止,上边的两个问题的答案已经呼之欲出了。

总结一下,Amb中自定义的Subscription作为沟通下游.sink()的桥梁接收request,Subscription中持有的Sink订阅了上游的Publisher,它作为Publisher和.sink()的中间桥梁,透传demand和数据。

那么回到开头的3个问题,你有答案了吗?

  • 如何接收上游PubLisher的数据?
  • 下游可能是Publisher或者其他Operator,也可能是Subscriber,如何处理这种情况?
  • 当下游是Subscriber时,如何接收其请求,并传播给上游?

总结

当初特别好奇,Combine中的Operator是如何实现的?因为它确实比较特殊,它的上游是Publisher或者Operator,下游是Operator或Subscriber。本文讲解的内容可以作为一个套路来学习,如果需要自定义Operator,可以参考这篇文章。