

ReactiveCocoa-RACScheduler底层实现分析
source link: https://chipengliu.github.io/2019/03/26/ReactiveCocoa-RACScheduler/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RACScheduler
ReactiveCocoa 框架中的调度器,ReactiveCocoa 中的信号可以在 RACScheduler
上执行任务、发送结果; RACScheduler
的实现主要是基于 GCD 封装,提供了 GCD 不具备的特性。
RACScheduler 有 4 个子类:
RACTestScheduler
RACSubscriptionScheduler
RACImmediateScheduler
RACQueueScheduler
接下来会分析平时使用中常接触到的 RACImmediateScheduler、RACQueueScheduler 和 RACSubscriptionScheduler 3个子类
RACScheduler
基类提供了5个构造方法:
1
2
3
4
5
6
7
8
9
10
11
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(nullable NSString *)name;
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority;
+ (RACScheduler *)scheduler;
+ (nullable RACScheduler *)currentScheduler;
+ (RACScheduler *)immediateScheduler;
+ (RACScheduler *)mainThreadScheduler;
+schedulerWithPriority:name:
、+schedulerWithPriority:
、+scheduler
三个方法细线逻辑相差不大
1
2
3
4
5
6
7
8
9
10
11
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name {
return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority, 0)];
}
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority {
return [self schedulerWithPriority:priority name:@"org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler"];
}
+ (RACScheduler *)scheduler {
return [self schedulerWithPriority:RACSchedulerPriorityDefault];
}
+schedulerWithPriority:name:
: 指定线程的优先级和名称,返回的 RACTargetQueueScheduler 对象,RACTargetQueueScheduler 是 RACQueueScheduler 的子类+schedulerWithPriority:
: 指定线程的优先级,线程名称设置为 org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler+scheduler
: 线程的优先级设置为 RACSchedulerPriorityDefault,线程名称设置为 org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler
+currentScheduler:
1
2
3
4
5
6
7
8
9
10
11
+ (RACScheduler *)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
return nil;
}
+ (BOOL)isOnMainThread {
return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}
该方法是从当前线程的线程字典获取对应的 RACScheduler 对象,如果为 nil 则判断当前线程是否为主线程,如果是在主线程上,就返回 mainThreadScheduler。如果既不在主线程上,则返回 nil。
+immediateScheduler:
1
2
3
4
5
6
7
8
9
+ (RACScheduler *)immediateScheduler {
static dispatch_once_t onceToken;
static RACScheduler *immediateScheduler;
dispatch_once(&onceToken, ^{
immediateScheduler = [[RACImmediateScheduler alloc] init];
});
return immediateScheduler;
}
这是一个单例方法,返回了 RACImmediateScheduler 对象
+mainThreadScheduler
1
2
3
4
5
6
7
8
9
+ (RACScheduler *)mainThreadScheduler {
static dispatch_once_t onceToken;
static RACScheduler *mainThreadScheduler;
dispatch_once(&onceToken, ^{
mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
});
return mainThreadScheduler;
}
同样是一个单例方法,返回的是 RACTargetQueueScheduler 对象,并将其名称赋值为 org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler
+subscriptionScheduler
1
2
3
4
5
6
7
8
9
+ (RACScheduler *)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});
return subscriptionScheduler;
}
也是一个单例方法,返回了 RACSubscriptionScheduler 对象。
RACScheduler 还提供了5个实例方法:
1
2
3
4
5
6
7
8
9
- (nullable RACDisposable *)schedule:(void (^)(void))block;
- (nullable RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block;
- (nullable RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block;
- (nullable RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block;
- (nullable RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock;
schedule:
: 传入 block 并根据相关条件来触发 blockafter:schedule:
: 延迟执行 block,传入的延迟时间是 NSDate 类型afterDelay:schedule:
: 延迟执行 block,传入的延迟时间是 NSTimeInterval 类型after:repeatingEvery:withLeeway:schedule:
: 创建定时任务,循环间隔由参数 interval 决定scheduleRecursiveBlock:
: 递归触发 block
上面5个方法除了 -scheduleRecursiveBlock:addingToDisposable:
都是由子类进行具体实现
-scheduleRecursiveBlock:addingToDisposable:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
@autoreleasepool {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[disposable addDisposable:selfDisposable];
__weak RACDisposable *weakSelfDisposable = selfDisposable;
RACDisposable *schedulingDisposable = [self schedule:^{
@autoreleasepool {
// 已经触发之后,weakSelfDisposable 已经没有作用,故移除
[disposable removeDisposable:weakSelfDisposable];
}
if (disposable.disposed) return;
void (^reallyReschedule)(void) = ^{
if (disposable.disposed) return;
[self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable];
};
// Protects the variables below.
//
// This doesn't actually need to be __block qualified, but Clang
// complains otherwise. :C
__block NSLock *lock = [[NSLock alloc] init];
lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)];
__block NSUInteger rescheduleCount = 0;
// 同步操作执行完后会赋值为YES,然后执行reallyReschedule闭包
__block BOOL rescheduleImmediately = NO;
@autoreleasepool {
recursiveBlock(^{
[lock lock];
BOOL immediate = rescheduleImmediately;
if (!immediate) ++rescheduleCount;
[lock unlock];
if (immediate) reallyReschedule();
});
}
[lock lock];
NSUInteger synchronousCount = rescheduleCount;
rescheduleImmediately = YES;
[lock unlock];
for (NSUInteger i = 0; i < synchronousCount; i++) {
reallyReschedule();
}
}];
[selfDisposable addDisposable:schedulingDisposable];
}
}
上面这段函数实现中有几个关键的变量/参数:
reallyReschedule:递归执行函数的闭包
recursiveBlock:函数传参,该闭包的参数也是一个闭包(block),recursiveBlock 执行完之后会触发传入的闭包 block
rescheduleCount:递归的次数
rescheduleImmediately:是否立即执行递归闭包 reallyReschedule
主要流程:
- 初始化相关变量之后,执行 recursiveBlock ,第一次 rescheduleImmediately 为 NO,rescheduleCount 递增
- recursiveBlock 执行完后 rescheduleImmediately 赋值为 YES
- 递归执行 reallyReschedule 闭包 rescheduleCount 次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (void)performAsCurrentScheduler:(void (^)(void))block {
NSCParameterAssert(block != NULL);
// If we're using a concurrent queue, we could end up in here concurrently,
// in which case we *don't* want to clear the current scheduler immediately
// after our block is done executing, but only *after* all our concurrent
// invocations are done.
RACScheduler *previousScheduler = RACScheduler.currentScheduler;
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
@autoreleasepool {
block();
}
if (previousScheduler != nil) {
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
} else {
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}
- 执行 block 之前将当前线程的 scheduler 保存下来为 previousScheduler,然后设置自己为新的 scheduler
- 执行完 block 之后恢复现场,如果 previousScheduler 不为空,则重新复制为当前线程的 scheduler,否则从 threadDictionary 移除 RACSchedulerCurrentSchedulerKey 对应的对象,也就是当前 scheduler
RACImmediateScheduler
RACImmediateScheduler 是一个私有类,主要特点是将加入的 block 立即(Immediate)进行调度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
block();
return nil;
}
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);
[NSThread sleepUntilDate:date];
block();
return nil;
}
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
NSCAssert(NO, @"+[RACScheduler immediateScheduler] does not support %@.", NSStringFromSelector(_cmd));
return nil;
}
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
for (__block NSUInteger remaining = 1; remaining > 0; remaining--) {
recursiveBlock(^{
remaining++;
});
}
return nil;
}
-schedule
方法会立即触发传入的 block;-after:schedule:
方法会将当前线程休眠到指定时间后执行 block;-after:repeatingEvery:withLeeway:schedule:
不支持-scheduleRecursiveBlock:
循环不断执行传入的 block
RACQueueScheduler
1
2
3
4
5
6
7
8
9
10
11
12
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
-schedule
在 self.queue 队列中异步调用了 -performAsCurrentScheduler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
+ (dispatch_time_t)wallTimeWithDate:(NSDate *)date {
NSCParameterAssert(date != nil);
double seconds = 0;
double frac = modf(date.timeIntervalSince1970, &seconds);
struct timespec walltime = {
.tv_sec = (time_t)fmin(fmax(seconds, LONG_MIN), LONG_MAX),
.tv_nsec = (long)fmin(fmax(frac * NSEC_PER_SEC, LONG_MIN), LONG_MAX)
};
return dispatch_walltime(&walltime, 0);
}
- 调用 wallTimeWithDate 方法将 NSDate 转化成 dispatch_time_t
- 调用 dispatch_after 将 block 放进队列 self.queue 中延迟执行
- block 执行过程中是通过
-performAsCurrentScheduler:
方法触发,触发前判断 disposable 是否被取消,取消则直接返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(interval > 0.0 && interval < INT64_MAX / NSEC_PER_SEC);
NSCParameterAssert(leeway >= 0.0 && leeway < INT64_MAX / NSEC_PER_SEC);
NSCParameterAssert(block != NULL);
uint64_t intervalInNanoSecs = (uint64_t)(interval * NSEC_PER_SEC);
uint64_t leewayInNanoSecs = (uint64_t)(leeway * NSEC_PER_SEC);
dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.queue);
dispatch_source_set_timer(timer, [self.class wallTimeWithDate:date], intervalInNanoSecs, leewayInNanoSecs);
dispatch_source_set_event_handler(timer, block);
dispatch_resume(timer);
return [RACDisposable disposableWithBlock:^{
dispatch_source_cancel(timer);
}];
}
该方法是通过 GCD 创建定时任务,然后通过 dispatch_source_set_event_handler 把参数 block 和 计时器关联起来,任务被取消的时候取消对应的定时器
RACTargetQueueScheduler
RACTargetQueueScheduler 继承于 RACQueueScheduler,提供了一新的初始化方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (instancetype)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue {
NSCParameterAssert(targetQueue != NULL);
if (name == nil) {
name = [NSString stringWithFormat:@"org.reactivecocoa.ReactiveObjC.RACTargetQueueScheduler(%s)", dispatch_queue_get_label(targetQueue)];
}
dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL);
if (queue == NULL) return nil;
dispatch_set_target_queue(queue, targetQueue);
return [super initWithName:name queue:queue];
}
在初始化方法创建了串行队列 queue,然后通过 dispatch_set_target_queue 把 targetQueue 和 queue 关联起来
dispatch_set_target_queue 有2个作用:
初始化方法内 queue 是通过 dispatch_queue_create 创建,无法设置优先级,dispatch_set_target_queue 可以将 queue 优先级设置为 targetQueue 的优先级
设置队列的层次体系,可以理解为 queue 中的任务会派发给 targetQueue;比如如果 targetQueue 是 DISPATCH_QUEUE_SERIAL 串行队列,则 queue 中的任务也是串行执行;如果多个 queue 都指定同一个 targetQueue 串行队列,那么多个 queue 的任务是同步执行的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25- (void)testSetTarget {
dispatch_queue_t targetQueue = dispatch_queue_create("targetQueue", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
dispatch_set_target_queue(queue1, targetQueue);
dispatch_set_target_queue(queue2, targetQueue);
dispatch_async(queue1, ^{
NSLog(@"1. queue1 excute");
});
dispatch_async(queue1, ^{
[NSThread sleepForTimeInterval:1.f];
NSLog(@"2. queue1 excute");
});
dispatch_async(queue2, ^{
NSLog(@"1. queue2 excute");
});
dispatch_async(queue2, ^{
NSLog(@"2. queue2 excute");
});
dispatch_async(targetQueue, ^{
NSLog(@"target queue");
});
}1
2
3
4
52019-03-31 19:47:32.441322+0800 AppTest[42843:1630586] 1. queue1 excute
2019-03-31 19:47:33.446694+0800 AppTest[42843:1630586] 2. queue1 excute
2019-03-31 19:47:33.446917+0800 AppTest[42843:1630586] 1. queue2 excute
2019-03-31 19:47:33.447003+0800 AppTest[42843:1630586] 2. queue2 excute
2019-03-31 19:47:33.447095+0800 AppTest[42843:1630586] target queue
RACSubscriptionScheduler
相对于父类,主要添加了一个私有的属性:
1
@property (nonatomic, strong, readonly) RACScheduler *backgroundScheduler;
在其初始化的时候会创建 backgroundScheduler
1
2
3
4
5
6
7
- (instancetype)init {
self = [super initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.subscriptionScheduler"];
_backgroundScheduler = [RACScheduler scheduler];
return self;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
return [scheduler after:date schedule:block];
}
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
return [scheduler after:date repeatingEvery:interval withLeeway:leeway schedule:block];
}
其他的方法大体逻辑都是判断当前线程有没有对应的 RACScheduler,如果有任务则在当前线程对应的 RACScheduler 执行,若没有则在 backgroundScheduler 上执行
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK