在真实的开发中,我们几乎不会去自定义Publisher,但学习本文的知识还是很有必要的,接下来的3篇文章,我会讲解如何自定义Publisher,Operator和Subscriber,我尽量把这些内容讲清楚,通过这3篇文章的学习,能够让大家对Combine的实现原理有一个清晰的认识。
这篇文章的主要代码来源于CombineExt
组合
/// 请求数据
static func fetch(url: URL) -> AnyPublisher<Data, GithubAPIError> {
return URLSession.shared.dataTaskPublisher(for: url)
.handleEvents(receiveCompletion: { _ in
networkActivityPublisher.send(false)
}, receiveCancel: {
networkActivityPublisher.send(false)
}, receiveRequest: { _ in
networkActivityPublisher.send(true)
})
.tryMap { data, response in
guard let httpResponse = response as? HTTPURLResponse else {
throw GithubAPIError.unknown
}
switch httpResponse.statusCode {
case 401:
throw GithubAPIError.apiError(reason: "Unauthorized")
case 403:
throw GithubAPIError.apiError(reason: "Resource forbidden")
case 404:
throw GithubAPIError.apiError(reason: "Resource not found")
case 405..<500:
throw GithubAPIError.apiError(reason: "client error")
case 500..<600:
throw GithubAPIError.apiError(reason: "server error")
default: break
}
return data
}
.mapError { error in
if let err = error as? GithubAPIError {
return err
}
if let err = error as? URLError {
return GithubAPIError.networkError(from: err)
}
return GithubAPIError.unknown
}
.eraseToAnyPublisher()
}
上边的代码就是一个使用Operator组合的例子,我们并没有自定义任何Publisher,但最后我们生成了一个AnyPublisher<Data, GithubAPIError>
类型的Publisher。
大家仔细想想,这种通过组合来实现某种功能的方式和自定义Publisher是不是没啥区别?也就是说,在开发中,尽可能的使用这种组合的方式解决问题。
自定义Create(一个新的Publisher)
我们这一小节将会演示一个完整的自定义Publisher的例子,Create
这个Publisher的使用方法跟Combine中的Record
很像,这是一个非常完美的示例,功能相同确能看到实现代码。Record
的用法如下:
let recordPublisher = Record<String, MyCustomError> { recording in
recording.receive("你")
recording.receive("好")
recording.receive("吗")
recording.receive(completion: Subscribers.Completion.finished)
}
而Create
的用法如下:
AnyPublisher<String, MyError>.create { subscriber in
// Values
subscriber.send("Hello")
subscriber.send("World!")
// Complete with error
subscriber.send(completion: .failure(MyError.someError))
// Or, complete successfully
subscriber.send(completion: .finished)
return AnyCancellable {
// Perform cleanup
}
}
在学习新技术的时候,我们要慢慢学会通过观察代码的使用方法,来尝试推断代码的设计思想。 我们尝试分析一下上边代码的思想:
AnyPublisher<String, MyError>.create
表明create
是AnyPublisher
的一个静态函数,该函数接收一个闭包作为参数- 闭包的参数
subscriber
至少有两个方法:send()
和send(completion:)
,一个用于发送数据。一个用于发送完成事件 - 闭包返回
AnyCancellable
- 重要的设计思想:使用闭包把数据的发送过程封装起来,当接收到订阅后,调用闭包,触发该过程
接下来,我们从代码层次来进一步分析上边代码的具体实现过程。
// MARK: - Publisher
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publishers {
/// A publisher which accepts a closure with a subscriber argument,
/// to which you can dynamically send value or completion events.
///
/// You should return a `Cancelable`-conforming object from the closure in
/// which you can define any cleanup actions to execute when the pubilsher
/// completes or the subscription to the publisher is canceled.
struct Create<Output, Failure: Swift.Error>: Publisher {
public typealias SubscriberHandler = (Subscriber) -> Cancellable
private let factory: SubscriberHandler
/// Initialize the publisher with a provided factory
///
/// - parameter factory: A factory with a closure to which you can
/// dynamically push value or completion events
public init(factory: @escaping SubscriberHandler) {
self.factory = factory
}
public func receive<S: Combine.Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(factory: factory, downstream: subscriber))
}
}
}
当我们要自定义Publisher时,从宏观应该考虑以下2点:
- 写一个
Publishers
的extension
,方便导出类型,比如上边代码中,导出的类型就是Publishers.Create
- 实现
Publisher
协议,其中最核心的是要给subscriber发送一个Subscription
,这个Subscription
是最核心的内容,我们下边详细讲解
在上边的代码中,Create
通过一个闭包(SubscriberHandler = (Subscriber) -> Cancellable
)来进行初始化,我们先研究一下这个闭包,不难看出,闭包的参数是个Subscriber
类型,我们看看它的代码:
public extension Publishers.Create {
struct Subscriber {
private let onValue: (Output) -> Void
private let onCompletion: (Subscribers.Completion<Failure>) -> Void
fileprivate init(onValue: @escaping (Output) -> Void,
onCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
self.onValue = onValue
self.onCompletion = onCompletion
}
/// Sends a value to the subscriber.
///
/// - Parameter value: The value to send.
public func send(_ input: Output) {
onValue(input)
}
/// Sends a completion event to the subscriber.
///
/// - Parameter completion: A `Completion` instance which indicates whether publishing has finished normally or failed with an error.
public func send(completion: Subscribers.Completion<Failure>) {
onCompletion(completion)
}
}
}
上边的代码,包含了以下几条信息:
- Subscriber本身是一个struct
- 初始化方法需要传入两个private闭包:
onValue
和onCompletion
,不能在外部调用 - 当调用
subscriber.send("Hello")
时,本质上是调用了onValue
- 当调用
subscriber.send(completion: .finished)
时,本质上是调用了onCompletion
总结一下:Subscriber对外暴露了两个函数接口,调用后,会触发闭包,至于闭包中的操作,我们在下文中会讲到。
接来下就是重点了,我们需要自定义Subscription
,数据的处理逻辑都在它里边,它起到了一个承上启下的核心功能。
private extension Publishers.Create {
class Subscription<Downstream: Combine.Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
private let buffer: DemandBuffer<Downstream>
private var cancelable: Cancellable?
init(factory: @escaping SubscriberHandler,
downstream: Downstream) {
self.buffer = DemandBuffer(subscriber: downstream)
let subscriber = Subscriber(onValue: { [weak self] in _ = self?.buffer.buffer(value: $0) },
onCompletion: { [weak self] in self?.buffer.complete(completion: $0) })
self.cancelable = factory(subscriber)
}
func request(_ demand: Subscribers.Demand) {
_ = self.buffer.demand(demand)
}
func cancel() {
self.cancelable?.cancel()
}
}
}
所谓的自定义Subscription
,就是实现Combine.Subscription
协议,它有2个目的:
- 通过
func request(_ demand: Subscribers.Demand)
接收订阅者的数据请求 - 通过
func cancel()
接收订阅者的取消请求
仔细观察上边的代码就能够发现,Subscription
的Output和Failure类型必须和下游的订阅者匹配上才行,并且引入了一个private let buffer: DemandBuffer<Downstream>
属性作为数据的缓存单元。
self.buffer = DemandBuffer(subscriber: downstream)
上边这行代码显示,DemandBuffer使用downstream进行初始化,别忘了downstream是一个订阅者,也就是subscriber,在这里,大家只需要理解,DemandBuffer持有了subscriber就可以了。
let subscriber = Subscriber(onValue: { [weak self] in _ = self?.buffer.buffer(value: $0) },
onCompletion: { [weak self] in self?.buffer.complete(completion: $0) })
这行代码就和前边讲过的Subscriber
联系上了,它的onValue
闭包绑定了self?.buffer.buffer(value: $0)
,也就是当调用subscriber.send("Hello")
后,实际上的操作是self?.buffer.buffer(value: "Hello")
,同样的道理,它的onCompletion
闭包绑定了self?.buffer.complete(completion: $0)
,也就是当调用subscriber.send(completion: .finished)
后,实际上的操作是self?.buffer.complete(completion: .finished)
。
self.cancelable = factory(subscriber)
这行代码才是Create
初始化参数闭包的真正调用的地方。
大家应该已经发现了吧?自定义Subscription的核心在于如何做好数据管理,我们还需要搞明白DemandBuffer这个东西的实现原理,它在后边两篇文章中,也起到了核心作用。
最后,我们分析一波DemandBuffer的源码:
class DemandBuffer<S: Subscriber> {
private let lock = NSRecursiveLock()
private var buffer = [S.Input]()
private let subscriber: S
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand()
init(subscriber: S) {
self.subscriber = subscriber
}
func buffer(value: S.Input) -> Subscribers.Demand {
precondition(self.completion == nil,
"How could a completed publisher sent values?! Beats me ♂️")
switch demandState.requested {
case .unlimited:
return subscriber.receive(value)
default:
buffer.append(value)
return flush()
}
}
func complete(completion: Subscribers.Completion<S.Failure>) {
precondition(self.completion == nil,
"Completion have already occured, which is quite awkward ")
self.completion = completion
_ = flush()
}
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
flush(adding: demand)
}
private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
lock.lock()
defer { lock.unlock() }
if let newDemand = newDemand {
demandState.requested += newDemand
}
// If buffer isn't ready for flushing, return immediately
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }
while !buffer.isEmpty && demandState.processed < demandState.requested {
demandState.requested += subscriber.receive(buffer.remove(at: 0))
demandState.processed += 1
}
if let completion = completion {
// Completion event was already sent
buffer = []
demandState = .init()
self.completion = nil
subscriber.receive(completion: completion)
return .none
}
let sentDemand = demandState.requested - demandState.sent
demandState.sent += sentDemand
return sentDemand
}
}
上边的代码虽然看上去很长,但内容并不多,我们可以把它分为3个部分:
- 初始化
- 对外接口
- 内部核心逻辑
我们先看看初始化的代码:
private let lock = NSRecursiveLock()
private var buffer = [S.Input]()
private let subscriber: S
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand()
init(subscriber: S) {
self.subscriber = subscriber
}
从属性let lock = NSRecursiveLock()
不难看出,它为数据的操作增加了安全性,这是非常有必要的,因为pipline专门处理异步数据流。在平时的开发中,我们也可以使用这个锁来保证安全地操作数据,用法如下:
lock.lock()
defer { lock.unlock() }
从属性var buffer = [S.Input]()
可以看出,它内部把数据保存在一个数据之中,数据的类型是Subscriber的输入类型。
从属性let subscriber: S
可以看出,它持有了Subscriber,这个在后边的代码中会用到。
之所以把var completion: Subscribers.Completion<S.Failure>?
这个属性保存起来,主要目的是数组buffer中不能存放该类型的数据,因此需要额外保存。
var demandState = Demand()
表示当前的请求状态,它是一个独立的struct,源码如下:
private extension DemandBuffer {
/// A model that tracks the downstream's
/// accumulated demand state
struct Demand {
var processed: Subscribers.Demand = .none
var requested: Subscribers.Demand = .none
var sent: Subscribers.Demand = .none
}
}
这里的代码可能会让人疑惑,看一下Subscribers.Demand
的定义:
/// A requested number of items, sent to a publisher from a subscriber through the subscription.
@frozen public struct Demand : Equatable, Comparable, Hashable, Codable, CustomStringConvertible {
/// A request for as many values as the publisher can produce.
public static let unlimited: Subscribers.Demand
/// A request for no elements from the publisher.
///
/// This is equivalent to `Demand.max(0)`.
public static let none: Subscribers.Demand
/// Creates a demand for the given maximum number of elements.
///
/// The publisher is free to send fewer than the requested maximum number of elements.
///
/// - Parameter value: The maximum number of elements. Providing a negative value for this parameter results in a fatal error.
@inlinable public static func max(_ value: Int) -> Subscribers.Demand
}
由于它实现了Equatable
,Comparable
和Hashable
这3个协议,所有完全可以把它看作是一个数字,可以进行运算,也可以进行比较,.none
可以看成0,.unlimited
可以看成最大值,也可以用.max
指定一个值。
那么这个值的作用是什么呢?很简单,它表示Subscriber(订阅者)能接受数据的最大个数。 我们看到下边这样的打印结果:
receive subscription: (PassthroughSubject)
request unlimited
就说明Subscriber(订阅者)可以接收任何数量的数据,没有限制。我们再回到前边的代码,var demandState = Demand()
的初始状态都是.none
。
接下来,我们看看第2部分的代码,主要是暴露出的接口,用于给外部调用:
func buffer(value: S.Input) -> Subscribers.Demand {
precondition(self.completion == nil,
"How could a completed publisher sent values?! Beats me ♂️")
switch demandState.requested {
case .unlimited:
return subscriber.receive(value)
default:
buffer.append(value)
return flush()
}
}
func complete(completion: Subscribers.Completion<S.Failure>) {
precondition(self.completion == nil,
"Completion have already occured, which is quite awkward ")
self.completion = completion
_ = flush()
}
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
flush(adding: demand)
}
buffer()
用于处理缓存数据的部分逻辑,当收到外部的调用后,如果请求是不受限的,就直接发送数据给Subscriber,否则,把数据拼接到数组中,然后调用flush()
complete()
用于接收外部的完成事件,保存后调用flush()
demand()
是一个非常奇妙且重要的方法,它的目的是响应一个Demand请求,然后调用flush()
处理这个响应,本质上这个函数中的参数的Demand请求就是Subscriber(订阅者)的请求。
我发现用文字讲知识确实比较费劲,没有视频好,大家再看看上边的自定义Subscription的代码:
func request(_ demand: Subscribers.Demand) {
_ = self.buffer.demand(demand)
}
Subscription实现了Combine.Subscription协议,func request(_ demand: Subscribers.Demand)
正是协议中的方法,该方法会被Subscriber(订阅者)调用。
大家如果有任何疑问,可以留言。我们再看看第3部分的内容:
private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
lock.lock()
defer { lock.unlock() }
if let newDemand = newDemand {
demandState.requested += newDemand
}
// If buffer isn't ready for flushing, return immediately
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }
while !buffer.isEmpty && demandState.processed < demandState.requested {
demandState.requested += subscriber.receive(buffer.remove(at: 0))
demandState.processed += 1
}
if let completion = completion {
// Completion event was already sent
buffer = []
demandState = .init()
self.completion = nil
subscriber.receive(completion: completion)
return .none
}
let sentDemand = demandState.requested - demandState.sent
demandState.sent += sentDemand
return sentDemand
}
上边代码很简单,就是当requested > 0的时候,把数据发送给Subscriber,DemandBuffer持有Subscriber的目的就是为了后边调用subscriber.receive(buffer.remove(at: 0))
。我们再重新分析一下Subscription的过程:
首先,我们初始化:
init(factory: @escaping SubscriberHandler,
downstream: Downstream) {
self.buffer = DemandBuffer(subscriber: downstream)
let subscriber = Subscriber(onValue: { [weak self] in _ = self?.buffer.buffer(value: $0) },
onCompletion: { [weak self] in self?.buffer.complete(completion: $0) })
self.cancelable = factory(subscriber)
}
初始化成功后,buffer
中的.flush()
函数并不会把数据透传给Subscriber,当收到订阅接收到订阅者的request后调用下边的代码:
func request(_ demand: Subscribers.Demand) {
_ = self.buffer.demand(demand)
}
然后调用buffer
中的.demand()
函数,.demand()
函数又调用.flush()
,最终遍历数组,把数据全部透传给Subscriber。
如果大家有点蒙,只能多看代码和上边的解释,再细细品一下。
最后,我们回到起始的地方,再回过头来看下边的代码:
public extension AnyPublisher {
init(_ factory: @escaping Publishers.Create<Output, Failure>.SubscriberHandler) {
self = Publishers.Create(factory: factory).eraseToAnyPublisher()
}
static func create(_ factory: @escaping Publishers.Create<Output, Failure>.SubscriberHandler)
-> AnyPublisher<Output, Failure> {
AnyPublisher(factory)
}
}
总结
自定义Publisher的关键是自定义Subscription,Subscription又通过DemandBuffer管理数据,DemandBuffer的核心思想是把数据放入到数组中,然后通过func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand
释放数据。