8

我没能实现始终在一个线程上运行 task

 2 years ago
source link: https://www.newbe.pro/Others/0x029-I-can-not-manage-to-always-run-task-on-one-thread/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

我没能实现始终在一个线程上运行 task

2023-04-01 1

前文我们总结了在使用常驻任务实现常驻线程时,应该注意的事项。但是我们最终没有提到如何在处理对于带有异步代码的办法。本篇将接受笔者对于该内容的总结。

如何识别当前代码跑在什么线程上

一切开始之前,我们先来使用一种简单的方式来识别当前代码运行在哪种线程上。

最简单的方式就是打印当前线程名称和线程 ID 来识别。

private static void ShowCurrentThread(string work)
{
Console.WriteLine($"{work} - {Thread.CurrentThread.Name} - {Thread.CurrentThread.ManagedThreadId}");
}

通过这段代码,我们可以非常容易的识别三种不同情况下的线程信息。

[Test]
public void ShowThreadMessage()
{
new Thread(() => { ShowCurrentThread("Custom thread work"); })
{
IsBackground = true,
Name = "Custom thread"
}.Start();

Task.Run(() => { ShowCurrentThread("Task.Run work"); });
Task.Factory.StartNew(() => { ShowCurrentThread("Task.Factory.StartNew work"); },
TaskCreationOptions.LongRunning);

Thread.Sleep(TimeSpan.FromSeconds(1));
}
// output
// Task.Factory.StartNew work - .NET Long Running Task - 17
// Custom thread work - Custom thread - 16
// Task.Run work - .NET ThreadPool Worker - 12
  • 自定义线程 Custom thread
  • 线程池线程 .NET ThreadPool Worker
  • 由 Task.Factory.StartNew 创建的新线程 .NET Long Running Task

因此,结合我们之前昙花线程的例子,我们也可以非常简单的看出线程的切换情况:

[Test]
public void ShortThread()
{
new Thread(async () =>
{
ShowCurrentThread("before await");
await Task.Delay(TimeSpan.FromSeconds(0.5));
ShowCurrentThread("after await");
})
{
IsBackground = true,
Name = "Custom thread"
}.Start();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
// output
// before await - Custom thread - 16
// after await - .NET ThreadPool Worker - 6

我们希望在同一个线程上运行 Task 代码

之前我们已经知道了,手动创建线程并控制线程的运行,可以确保自己的代码不会于线程池线程产生竞争,从而使得我们的常驻任务能够稳定的触发。

当时用于演示的错误示例是这样的:

[Test]
public void ThreadWaitTask()
{
new Thread(async () =>
{
ShowCurrentThread("before await");
Task.Run(() =>
{
ShowCurrentThread("inner task");
}).Wait();
ShowCurrentThread("after await");
})
{
IsBackground = true,
Name = "Custom thread"
}.Start();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
// output
// before await - Custom thread - 16
// inner task - .NET ThreadPool Worker - 13
// after await - Custom thread - 16

这个示例可以明显的看出,中间的部分代码是运行在线程池的。这种做法会在线程池资源紧张的时候,导致我们的常驻任务无法触发。

因此,我们需要一种方式来确保我们的代码在同一个线程上运行。

那么接下来我们分析一些想法和效果。

加配!加配!加配!

我们已经知道了,实际上,常驻任务不能稳定触发是因为 Task 会在线程池中运行。那么增加线程池的容量自然就是最直接解决高峰的做法。
因此,如果条件允许的话,直接增加 CPU 核心数实际上是最为有效和简单的方式。

不过这种做法并不适用于一些类库的编写者。比如,你在编写日志类库,那么其实无法欲知用户所处的环境。并且正如大家所见,市面上几乎没有日志类库中由说明让用户只能在一定的 CPU 核心数下使用。

因此,如果您的常驻任务是在类库中,那么我们需要一种更为通用的方式来解决这个问题。

考虑使用同步重载

在 Task 出现之后,很多时候我们都会考虑使用异步重载的方法。这显然不是错误的做法,因为这可以使得我们的代码更加高效,提升系统的吞吐量。但是,如果你想要让 Thread 稳定的在同一个线程上运行,那么你需要考虑使用同步重载的方法。通过同步重载方法,我们的代码将不会出现线程切换到线程池的情况。自然也就实现了我们的目的。

总是使用 TaskCreationOptions.LongRunning

这个办法其实很不实际。因为任何一层没有指定,都会将任务切换到线程池中。

[Test]
public void AlwaysLogRunning()
{
new Thread(async () =>
{
ShowCurrentThread("before await");
Task.Factory.StartNew(() =>
{
ShowCurrentThread("LongRunning task");
Task.Run(() => { ShowCurrentThread("inner task"); }).Wait();
}, TaskCreationOptions.LongRunning).Wait();
ShowCurrentThread("after await");
})
{
IsBackground = true,
Name = "Custom thread"
}.Start();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
// output
// before await - Custom thread - 16
// LongRunning task - .NET Long Running Task - 17
// inner task - .NET ThreadPool Worker - 7
// after await - Custom thread - 16

所以说,这个办法可以用。但其实很怪。

自定义 Scheduler

这是一种可行,但是非常困难的做法。虽然说自定义个简单的 Scheduler 也不是很难,只需要实现几个简单的方法。但要按照我们的需求来实现这个 Scheduler 并不简单。

比如我们尝试实现一个这样的 Scheduler:

注意:这个 Scheduler 并不能正常工作。

class MyScheduler : TaskScheduler
{
private readonly Thread _thread;
private readonly ConcurrentQueue<Task> _tasks = new();

public MyScheduler()
{
_thread = new Thread(() =>
{
while (true)
{
while (_tasks.TryDequeue(out var task))
{
TryExecuteTask(task);
}
}
})
{
IsBackground = true,
Name = "MyScheduler"
};
_thread.Start();
}

protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}

protected override void QueueTask(Task task)
{
_tasks.Enqueue(task);
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
}

上面的代码中,我们期待通过一个单一的线程来执行所有的任务。但实际上它反而是一个非常简单的死锁演示装置。

我们设想运行下面这段代码:

[Test]
public async Task TestLongRunningConfigureAwait()
{
var scheduler = new MyScheduler();
await Task.Factory.StartNew(() =>
{
ShowCurrentThread("BeforeWait");
Task.Factory
.StartNew(() =>
{
ShowCurrentThread("AfterWait");
}
, CancellationToken.None, TaskCreationOptions.None, scheduler)
.Wait();
ShowCurrentThread("AfterWait");
}, CancellationToken.None, TaskCreationOptions.None, scheduler);
}

这段代码中,我们期待,在一个 Task 中运行另外一个 Task。但实际上,这段代码会死锁。

因为,我们的 MyScheduler 中,我们在一个死循环中,不断的从队列中取出任务并执行。但是,我们的任务中,又会调用 Wait 方法。

我们不妨设想这个线程就是我们自己。

  1. 首先,老板交代给你一件任务,你把它放到队列中。
  2. 然后你开始执行这件任务,执行到一半发现,你需要等待第二件任务的执行结果。因此你在这里等着。
  3. 但是第二件任务这个时候也塞到了你的队列中。
  4. 这下好了,你手头的任务在等待你队列里面的任务完成。而你队列的任务只有你才能完成。
  5. 完美卡死。

因此,其实实际上我们需要在 Wait 的时候通知当前线程,此时线程被 Block 了,然后转而从队列中取出任务执行。在 Task 于 ThreadPool 的配合中,是存在这样的机制的。但是,我们自己实现的 MyScheduler 并不能与 Task 产生这种配合。因此需要考虑自定义一个 Task。跟进一步说,我们需要自定义 AsyncMethodBuilder 来实现全套的自定义。

显然者是一项相对高级内容,期待了解的读者,可以通过 UniTask^7 项目来了解如何实现这样的全套自定义。

如果你期望在常驻线程能够稳定的运行你的任务。那么:

  1. 加配,以避免线程池不够用
  2. 考虑在这部分代码中使用同步代码
  3. 可以学习自定义 Task 系统

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK