Combine之自定义Publisher

06/26/2022 13:07 下午 posted in  apple

在真实的开发中,我们几乎不会去自定义Publisher,但学习本文的知识还是很有必要的,接下来的3篇文章,我会讲解如何自定义Publisher,Operator和Subscriber,我尽量把这些内容讲清楚,通过这3篇文章的学习,能够让大家对Combine的实现原理有一个清晰的认识。

这篇文章的主要代码来源于CombineExt

https://github.com/agelessman/FuckingSwiftUI

组合

/// 请求数据
static func fetch(url: URL) -> AnyPublisher<Data, GithubAPIError> {
    return URLSession.shared.dataTaskPublisher(for: url)
        .handleEvents(receiveCompletion: { _ in
            networkActivityPublisher.send(false)
        }, receiveCancel: {
            networkActivityPublisher.send(false)
        }, receiveRequest: { _ in
            networkActivityPublisher.send(true)
        })
        .tryMap { data, response in
            guard let httpResponse = response as? HTTPURLResponse else {
                throw GithubAPIError.unknown
            }
            switch httpResponse.statusCode {
            case 401:
                throw GithubAPIError.apiError(reason: "Unauthorized")
            case 403:
                throw GithubAPIError.apiError(reason: "Resource forbidden")
            case 404:
                throw GithubAPIError.apiError(reason: "Resource not found")
            case 405..<500:
                throw GithubAPIError.apiError(reason: "client error")
            case 500..<600:
                throw GithubAPIError.apiError(reason: "server error")
            default: break
            }

            return data
        }
        .mapError { error in
            if let err = error as? GithubAPIError {
                return err
            }
            if let err = error as? URLError {
                return GithubAPIError.networkError(from: err)
            }
            return GithubAPIError.unknown
        }
        .eraseToAnyPublisher()
}

上边的代码就是一个使用Operator组合的例子,我们并没有自定义任何Publisher,但最后我们生成了一个AnyPublisher<Data, GithubAPIError>类型的Publisher。

大家仔细想想,这种通过组合来实现某种功能的方式和自定义Publisher是不是没啥区别?也就是说,在开发中,尽可能的使用这种组合的方式解决问题。

自定义Create(一个新的Publisher)

我们这一小节将会演示一个完整的自定义Publisher的例子,Create这个Publisher的使用方法跟Combine中的Record很像,这是一个非常完美的示例,功能相同确能看到实现代码。Record的用法如下:

let recordPublisher = Record<String, MyCustomError> { recording in
    recording.receive("你")
    recording.receive("好")
    recording.receive("吗")
    recording.receive(completion: Subscribers.Completion.finished)
}

Create的用法如下:

AnyPublisher<String, MyError>.create { subscriber in
  // Values
  subscriber.send("Hello")
  subscriber.send("World!")

  // Complete with error
  subscriber.send(completion: .failure(MyError.someError))

  // Or, complete successfully
  subscriber.send(completion: .finished)

  return AnyCancellable { 
    // Perform cleanup
  }
}

在学习新技术的时候,我们要慢慢学会通过观察代码的使用方法,来尝试推断代码的设计思想。 我们尝试分析一下上边代码的思想:

  • AnyPublisher<String, MyError>.create表明createAnyPublisher的一个静态函数,该函数接收一个闭包作为参数
  • 闭包的参数subscriber至少有两个方法:send()send(completion:),一个用于发送数据。一个用于发送完成事件
  • 闭包返回AnyCancellable
  • 重要的设计思想:使用闭包把数据的发送过程封装起来,当接收到订阅后,调用闭包,触发该过程

接下来,我们从代码层次来进一步分析上边代码的具体实现过程。

// MARK: - Publisher
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publishers {
    /// A publisher which accepts a closure with a subscriber argument,
    /// to which you can dynamically send value or completion events.
    ///
    /// You should return a `Cancelable`-conforming object from the closure in
    /// which you can define any cleanup actions to execute when the pubilsher
    /// completes or the subscription to the publisher is canceled.
    struct Create<Output, Failure: Swift.Error>: Publisher {
        public typealias SubscriberHandler = (Subscriber) -> Cancellable
        private let factory: SubscriberHandler

        /// Initialize the publisher with a provided factory
        ///
        /// - parameter factory: A factory with a closure to which you can
        ///                      dynamically push value or completion events
        public init(factory: @escaping SubscriberHandler) {
            self.factory = factory
        }

        public func receive<S: Combine.Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
            subscriber.receive(subscription: Subscription(factory: factory, downstream: subscriber))
        }
    }
}

当我们要自定义Publisher时,从宏观应该考虑以下2点:

  • 写一个Publishersextension,方便导出类型,比如上边代码中,导出的类型就是Publishers.Create
  • 实现Publisher协议,其中最核心的是要给subscriber发送一个Subscription,这个Subscription是最核心的内容,我们下边详细讲解

在上边的代码中,Create通过一个闭包(SubscriberHandler = (Subscriber) -> Cancellable)来进行初始化,我们先研究一下这个闭包,不难看出,闭包的参数是个Subscriber类型,我们看看它的代码:

public extension Publishers.Create {
    struct Subscriber {
        private let onValue: (Output) -> Void
        private let onCompletion: (Subscribers.Completion<Failure>) -> Void

        fileprivate init(onValue: @escaping (Output) -> Void,
                         onCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
            self.onValue = onValue
            self.onCompletion = onCompletion
        }

        /// Sends a value to the subscriber.
        ///
        /// - Parameter value: The value to send.
        public func send(_ input: Output) {
            onValue(input)
        }

        /// Sends a completion event to the subscriber.
        ///
        /// - Parameter completion: A `Completion` instance which indicates whether publishing has finished normally or failed with an error.
        public func send(completion: Subscribers.Completion<Failure>) {
            onCompletion(completion)
        }
    }
}

上边的代码,包含了以下几条信息:

  • Subscriber本身是一个struct
  • 初始化方法需要传入两个private闭包:onValueonCompletion,不能在外部调用
  • 当调用subscriber.send("Hello")时,本质上是调用了onValue
  • 当调用subscriber.send(completion: .finished)时,本质上是调用了onCompletion

总结一下:Subscriber对外暴露了两个函数接口,调用后,会触发闭包,至于闭包中的操作,我们在下文中会讲到。

接来下就是重点了,我们需要自定义Subscription,数据的处理逻辑都在它里边,它起到了一个承上启下的核心功能。

private extension Publishers.Create {
    class Subscription<Downstream: Combine.Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
        private let buffer: DemandBuffer<Downstream>
        private var cancelable: Cancellable?

        init(factory: @escaping SubscriberHandler,
             downstream: Downstream) {
            self.buffer = DemandBuffer(subscriber: downstream)

            let subscriber = Subscriber(onValue: { [weak self] in _ = self?.buffer.buffer(value: $0) },
                                        onCompletion: { [weak self] in self?.buffer.complete(completion: $0) })

            self.cancelable = factory(subscriber)
        }

        func request(_ demand: Subscribers.Demand) {
            _ = self.buffer.demand(demand)
        }

        func cancel() {
            self.cancelable?.cancel()
        }
    }
}

所谓的自定义Subscription,就是实现Combine.Subscription协议,它有2个目的:

  • 通过func request(_ demand: Subscribers.Demand)接收订阅者的数据请求
  • 通过func cancel()接收订阅者的取消请求

仔细观察上边的代码就能够发现,Subscription的Output和Failure类型必须和下游的订阅者匹配上才行,并且引入了一个private let buffer: DemandBuffer<Downstream>属性作为数据的缓存单元。

self.buffer = DemandBuffer(subscriber: downstream)

上边这行代码显示,DemandBuffer使用downstream进行初始化,别忘了downstream是一个订阅者,也就是subscriber,在这里,大家只需要理解,DemandBuffer持有了subscriber就可以了。

let subscriber = Subscriber(onValue: { [weak self] in _ = self?.buffer.buffer(value: $0) },
                                        onCompletion: { [weak self] in self?.buffer.complete(completion: $0) })

这行代码就和前边讲过的Subscriber联系上了,它的onValue闭包绑定了self?.buffer.buffer(value: $0),也就是当调用subscriber.send("Hello")后,实际上的操作是self?.buffer.buffer(value: "Hello"),同样的道理,它的onCompletion闭包绑定了self?.buffer.complete(completion: $0),也就是当调用subscriber.send(completion: .finished)后,实际上的操作是self?.buffer.complete(completion: .finished)

self.cancelable = factory(subscriber)

这行代码才是Create初始化参数闭包的真正调用的地方。

大家应该已经发现了吧?自定义Subscription的核心在于如何做好数据管理,我们还需要搞明白DemandBuffer这个东西的实现原理,它在后边两篇文章中,也起到了核心作用。

最后,我们分析一波DemandBuffer的源码:

class DemandBuffer<S: Subscriber> {
    private let lock = NSRecursiveLock()
    private var buffer = [S.Input]()
    private let subscriber: S
    private var completion: Subscribers.Completion<S.Failure>?
    private var demandState = Demand()

    init(subscriber: S) {
        self.subscriber = subscriber
    }

    func buffer(value: S.Input) -> Subscribers.Demand {
        precondition(self.completion == nil,
                     "How could a completed publisher sent values?! Beats me  ‍♂️")

        switch demandState.requested {
        case .unlimited:
            return subscriber.receive(value)
        default:
            buffer.append(value)
            return flush()
        }
    }


    func complete(completion: Subscribers.Completion<S.Failure>) {
        precondition(self.completion == nil,
                     "Completion have already occured, which is quite awkward  ")

        self.completion = completion
        _ = flush()
    }


    func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
        flush(adding: demand)
    }

    private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
        lock.lock()
        defer { lock.unlock() }

        if let newDemand = newDemand {
            demandState.requested += newDemand
        }

        // If buffer isn't ready for flushing, return immediately
        guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }

        while !buffer.isEmpty && demandState.processed < demandState.requested {
            demandState.requested += subscriber.receive(buffer.remove(at: 0))
            demandState.processed += 1
        }

        if let completion = completion {
            // Completion event was already sent
            buffer = []
            demandState = .init()
            self.completion = nil
            subscriber.receive(completion: completion)
            return .none
        }

        let sentDemand = demandState.requested - demandState.sent
        demandState.sent += sentDemand
        return sentDemand
    }
}

上边的代码虽然看上去很长,但内容并不多,我们可以把它分为3个部分:

  • 初始化
  • 对外接口
  • 内部核心逻辑

我们先看看初始化的代码:

private let lock = NSRecursiveLock()
private var buffer = [S.Input]()
private let subscriber: S
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand()


init(subscriber: S) {
    self.subscriber = subscriber
}

从属性let lock = NSRecursiveLock()不难看出,它为数据的操作增加了安全性,这是非常有必要的,因为pipline专门处理异步数据流。在平时的开发中,我们也可以使用这个锁来保证安全地操作数据,用法如下:

lock.lock()
defer { lock.unlock() }

从属性var buffer = [S.Input]()可以看出,它内部把数据保存在一个数据之中,数据的类型是Subscriber的输入类型。

从属性let subscriber: S可以看出,它持有了Subscriber,这个在后边的代码中会用到。

之所以把var completion: Subscribers.Completion<S.Failure>?这个属性保存起来,主要目的是数组buffer中不能存放该类型的数据,因此需要额外保存。

var demandState = Demand()表示当前的请求状态,它是一个独立的struct,源码如下:

private extension DemandBuffer {
    /// A model that tracks the downstream's
    /// accumulated demand state
    struct Demand {
        var processed: Subscribers.Demand = .none
        var requested: Subscribers.Demand = .none
        var sent: Subscribers.Demand = .none
    }
}

这里的代码可能会让人疑惑,看一下Subscribers.Demand的定义:

/// A requested number of items, sent to a publisher from a subscriber through the subscription.
@frozen public struct Demand : Equatable, Comparable, Hashable, Codable, CustomStringConvertible {

    /// A request for as many values as the publisher can produce.
    public static let unlimited: Subscribers.Demand

    /// A request for no elements from the publisher.
    ///
    /// This is equivalent to `Demand.max(0)`.
    public static let none: Subscribers.Demand

    /// Creates a demand for the given maximum number of elements.
    ///
    /// The publisher is free to send fewer than the requested maximum number of elements.
    ///
    /// - Parameter value: The maximum number of elements. Providing a negative value for this parameter results in a fatal error.
    @inlinable public static func max(_ value: Int) -> Subscribers.Demand
}

由于它实现了Equatable,ComparableHashable这3个协议,所有完全可以把它看作是一个数字,可以进行运算,也可以进行比较,.none可以看成0,.unlimited可以看成最大值,也可以用.max指定一个值。

那么这个值的作用是什么呢?很简单,它表示Subscriber(订阅者)能接受数据的最大个数。 我们看到下边这样的打印结果:

receive subscription: (PassthroughSubject)
request unlimited

就说明Subscriber(订阅者)可以接收任何数量的数据,没有限制。我们再回到前边的代码,var demandState = Demand()的初始状态都是.none

接下来,我们看看第2部分的代码,主要是暴露出的接口,用于给外部调用:

func buffer(value: S.Input) -> Subscribers.Demand {
    precondition(self.completion == nil,
                 "How could a completed publisher sent values?! Beats me  ‍♂️")

    switch demandState.requested {
    case .unlimited:
        return subscriber.receive(value)
    default:
        buffer.append(value)
        return flush()
    }
}


func complete(completion: Subscribers.Completion<S.Failure>) {
    precondition(self.completion == nil,
                 "Completion have already occured, which is quite awkward  ")

    self.completion = completion
    _ = flush()
}


func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
    flush(adding: demand)
}
  • buffer()用于处理缓存数据的部分逻辑,当收到外部的调用后,如果请求是不受限的,就直接发送数据给Subscriber,否则,把数据拼接到数组中,然后调用flush()
  • complete()用于接收外部的完成事件,保存后调用flush()
  • demand()是一个非常奇妙且重要的方法,它的目的是响应一个Demand请求,然后调用flush()处理这个响应,本质上这个函数中的参数的Demand请求就是Subscriber(订阅者)的请求。

我发现用文字讲知识确实比较费劲,没有视频好,大家再看看上边的自定义Subscription的代码:

func request(_ demand: Subscribers.Demand) {
    _ = self.buffer.demand(demand)
}

Subscription实现了Combine.Subscription协议,func request(_ demand: Subscribers.Demand)正是协议中的方法,该方法会被Subscriber(订阅者)调用。

大家如果有任何疑问,可以留言。我们再看看第3部分的内容:

private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
    lock.lock()
    defer { lock.unlock() }

    if let newDemand = newDemand {
        demandState.requested += newDemand
    }

    // If buffer isn't ready for flushing, return immediately
    guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }

    while !buffer.isEmpty && demandState.processed < demandState.requested {
        demandState.requested += subscriber.receive(buffer.remove(at: 0))
        demandState.processed += 1
    }

    if let completion = completion {
        // Completion event was already sent
        buffer = []
        demandState = .init()
        self.completion = nil
        subscriber.receive(completion: completion)
        return .none
    }

    let sentDemand = demandState.requested - demandState.sent
    demandState.sent += sentDemand
    return sentDemand
}

上边代码很简单,就是当requested > 0的时候,把数据发送给Subscriber,DemandBuffer持有Subscriber的目的就是为了后边调用subscriber.receive(buffer.remove(at: 0))。我们再重新分析一下Subscription的过程:

首先,我们初始化:

init(factory: @escaping SubscriberHandler,
     downstream: Downstream) {
    self.buffer = DemandBuffer(subscriber: downstream)

    let subscriber = Subscriber(onValue: { [weak self] in _ = self?.buffer.buffer(value: $0) },
                                onCompletion: { [weak self] in self?.buffer.complete(completion: $0) })

    self.cancelable = factory(subscriber)
}

初始化成功后,buffer中的.flush()函数并不会把数据透传给Subscriber,当收到订阅接收到订阅者的request后调用下边的代码:

func request(_ demand: Subscribers.Demand) {
    _ = self.buffer.demand(demand)
}

然后调用buffer中的.demand()函数,.demand()函数又调用.flush(),最终遍历数组,把数据全部透传给Subscriber。

如果大家有点蒙,只能多看代码和上边的解释,再细细品一下。

最后,我们回到起始的地方,再回过头来看下边的代码:

public extension AnyPublisher {

    init(_ factory: @escaping Publishers.Create<Output, Failure>.SubscriberHandler) {
        self = Publishers.Create(factory: factory).eraseToAnyPublisher()
    }

    static func create(_ factory: @escaping Publishers.Create<Output, Failure>.SubscriberHandler)
        -> AnyPublisher<Output, Failure> {
        AnyPublisher(factory)
    }
}

总结

自定义Publisher的关键是自定义Subscription,Subscription又通过DemandBuffer管理数据,DemandBuffer的核心思想是把数据放入到数组中,然后通过func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand释放数据。