本文虽然主要讲解如何自定义Subscriber,但在真实的开发中是没有必要这样做的,从上图可以看出,Subscriber一共做了3件事:
- 订阅Publisher,
- 发送request
- 接收数据
一般来说,当Subscriber订阅了某个Publisher并收到subscription(订阅凭证)后,会立刻发送request,然后就等待数据就行了。
-
如果想控制订阅的时机,比如说点击了某个按钮后再订阅,那么就在点击了按钮后调用
.sink()
就可以了,没必要自定义sink -
如果想控制发送request的时机,比如说延时5秒发送请求,那也没必要自定义sink,只需延时5秒调用
.sink()
就可以 -
如果想处理数据,那么在闭包里操作就行了,没必要把这个处理细节封装起来
本文只是探讨sink的自定义问题, 目的是让大家学习一下Combine中sink的实现方式。
我们先看看Combine中Sink类的定义:
extension Subscribers {
/// A simple subscriber that requests an unlimited number of values upon subscription.
final public class Sink<Input, Failure> : Subscriber, Cancellable, CustomStringConvertible, CustomReflectable, CustomPlaygroundDisplayConvertible where Failure : Error {
/// The closure to execute on receipt of a value.
final public var receiveValue: (Input) -> Void { get }
/// The closure to execute on completion.
final public var receiveCompletion: (Subscribers.Completion<Failure>) -> Void { get }
final public var description: String { get }
final public var customMirror: Mirror { get }
/// A custom playground description for this instance.
final public var playgroundDescription: Any { get }
public init(receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void), receiveValue: @escaping ((Input) -> Void))
final public func receive(subscription: Subscription)
final public func receive(_ value: Input) -> Subscribers.Demand
final public func receive(completion: Subscribers.Completion<Failure>)
/// Cancel the activity.
final public func cancel()
}
}
从上边的代码可以看出,Sink
是一个实现了Subscriber
,Cancellable
等多个协议的类,因此下边的这些方法都是协议中的方法。
我们比较关心的是Subscriber
协议,既然Sink
实现了该协议,那么我们就可以用它的实例对象来订阅Publisher,像下边这样使用:
let publisher = PassthroughSubject<Int, Never>()
let sink = Subscribers.Sink<Int, Never>(receiveCompletion: {
print($0)
}, receiveValue: {
print($0)
})
publisher.subscribe(sink)
publisher.send(1)
上边的代码等价于:
publisher
.sink(receiveCompletion: {
print($0)
}, receiveValue: {
print($0)
})
我觉得有必要讲解一下为什么上边的代码是等价的,关键在于上边代码中的sink方法:
extension Publisher {
public func sink(receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) -> Void), receiveValue: @escaping ((Self.Output) -> Void)) -> AnyCancellable
}
可以看出,首先它是Publisher
协议的方法,因此,所有的Publishers都可以调用,其次,该方法内部只是创建了一个Subscribers.Sink
,然后将其返回即可,代码如下:
extension Publisher {
public func testSink(receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) -> Void),
receiveValue: @escaping ((Self.Output) -> Void)) -> AnyCancellable {
let sink = Subscribers.Sink<Self.Output, Self.Failure>(receiveCompletion: {
receiveCompletion($0)
}, receiveValue: {
receiveValue($0)
})
self.subscribe(sink)
return AnyCancellable(sink)
}
}
在上边的代码中,我特意把sink
写成了testSink
做个区分,可以看出,本质上就是在testSink
函数内创建了一个Sink
的实例,因此,我们可以像下边这样使用:
let publisher = PassthroughSubject<Int, Never>()
cancellable = publisher.testSink(receiveCompletion: {
print($0)
}, receiveValue: {
print($0)
})
publisher.send(1)
大家仔细品一品,.sink()
只是对外暴露出的一个简单的函数接口,真正的核心是Sink
,因为它实现了Subscriber
和Cancellable
协议。
那么重点来了,我们就来看看Sink
在这些协议方法中做了什么事?
extension Subscribers {
final public class CustomSink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
let receiveValue: (Input) -> Void
var subscription: Subscription?
init(receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void),
receiveValue: @escaping ((Input) -> Void)) {
self.receiveCompletion = receiveCompletion
self.receiveValue = receiveValue
}
public func receive(subscription: Subscription) {
self.subscription = subscription
self.subscription?.request(.unlimited)
}
public func receive(_ input: Input) -> Subscribers.Demand {
receiveValue(input)
return .none
}
public func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
subscription = nil
}
public func cancel() {
subscription?.cancel()
subscription = nil
}
}
}
CustomSink
就是我们自定义的实现了Subscriber
和Cancellable
协议的类,代码很容易理解,我就不做更多介绍了。值得注意的有以下2点:
- 当收到subscription后,会立刻发送request
receive(_ input: Input)
函数的返回值类型是Subscribers.Demand
,为什么需要给一个返回值呢?原因是当CustomSink
通过该方法收到数据后,可以返回一个值,告诉Publisher当达到接受的最大值时还可以接收更多的值,举个例子,比如说假设我们自定义的CustomSink
接收值不是无限的,而是最多接收3个,那么在发送request时,代码是这样的self.subscription?.request(.max(3))
,这种情况下最多只能接收3个值,我们可以改动一下代码,当receive(_ input: Input)
收到第3个值的时候,我们返回return .max(1)
,这样就能接收4个值了
self.subscription?.request(.max(3))
我们首先把request中的参数设置为最大接收3个值,然后试一下:
let publisher = PassthroughSubject<Int, Never>()
cancellable = publisher.customSink(receiveCompletion: {
print($0)
}, receiveValue: {
print($0)
})
publisher.send(1)
publisher.send(2)
publisher.send(3)
publisher.send(4)
publisher.send(5)
打印结果:
1
2
3
说明最多只能接收3个数据,然后,我们修改一下代码。改动如下:
extension Subscribers {
final public class CustomSink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
...
var count = 0
...
public func receive(_ input: Input) -> Subscribers.Demand {
receiveValue(input)
count += 1
if count == 3 {
return .max(1)
} else {
return .none
}
}
...
}
}
我只增加了一个count
属性来记录当前接收数据的个数,当等于3时,返回了一个return .max(1)
,根据我们上边的解释,这时候就可以额外接收一个数据,打印如下:
1
2
3
4
大家明白了吗?这种方式很灵活,在某些场景下可以像上边那样来增加新的接收的参数。
接下来只需要在Publisher
下暴露出一个接口就可以了:
extension Publisher {
public func customSink(receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) -> Void),
receiveValue: @escaping ((Self.Output) -> Void)) -> AnyCancellable {
let sink = Subscribers.CustomSink<Self.Output, Self.Failure>(receiveCompletion: {
receiveCompletion($0)
}, receiveValue: {
receiveValue($0)
})
self.subscribe(sink)
return AnyCancellable(sink)
}
}
总结
总起来说,自定义Subscriber是一件非常简单的事,也是一件不必要的事,Subscriber最核心的思想只是接收数据和事件,对数据和事件不做任何逻辑。