25

Go语言学习 - Chan的工作原理

 3 years ago
source link: https://studygolang.com/articles/29784
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

我们创建了一大堆线程, 现在我们想要实现线程间的同步, 这其中的关键就是chan(通道)的使用, 如果没有通道, 你应该怎么去做线程间同步呢? time.Sleep吗?

Introduction

type hchan struct {
  dataq_size uint            // 缓冲槽大小
  buf        unsafe.Pointer  // 缓冲槽本体
  elem_type  *_type          // 槽内数据类型
}
复制代码
yYVNbmV.png!web

缓冲槽的工作方式就是上图那样, 每当你往通道里写消息, 消息会先存到缓冲槽里, 而后才被取出来. 这种常规的工作模式也叫异步模式, 因为收发工作不是同步进行的, 你可以先发, 发完你走人, 随后收件人再去管道里取.

同样还有一种用法是不设置缓冲槽, 或者说你直接把缓冲槽大小设置成0, 这种工作模式下, 收发双方必须同时守在管道旁, 否则先到的人一定会堵塞在管道哪儿, 等待后到的人, 然后通信才能开始, 这种也叫同步模式

仔细想一想通道给我们带来了什么, 如果将通道的功能点拆开, 通道的核心功能点就是: 可以在两个不同的G之间相互发消息, 同时还有一套阻塞/唤醒的机制.

通道的工作模式

首先分析一个关键点, 你需要知道有哪些G正守着这个通道, 然后把管道里的参数拷贝给这些堵塞着的G, 最后去解除他们的阻 . 有了这个前提条件, 通道工作围绕的对象就一定是G,

type hchan struct {
  recv_q waitq  // 接收者队列
  send_q waitq  // 发送者队列
}

type waitq struct {
  first *sudog  
  last  *sudog
}

type sudog struct {
  g     *g               // 想要收发消息的g本体
  elem  unsafe.Pointer   // 要发的消息本体
}
复制代码

每个通道都会维护一个发送者队里以及一个接收者队列, 队列里的元素是一个包装过的G(G本体+消息). 我们通过一个发送与接收的过程, 先说说同步通道是如何工作的, 异步通道于此类似

收发 - 同步模式

chan <- data 的操作会被编译器翻译成去执行 chansend 函数, 执行的对象是名为 chan 的通道, 携带一个消息结构体. 检查chan的缓冲槽长度为0, 进入通道的同步模式工作:

  • 找一个接收者, 检查chan的接收者队列
  • chan.recv_q为空, 按照同步队列的工作模式, 因为没人接消息, 这个G必须阻塞(沉睡), 你这个G, 连同你的消息, 一起被打包成一个sudog, 放到chan的发送者队列里
    • 然后通过gopark把你放入沉睡模式
  • 但如果有接收者, 取出这个sudog对象, 然后把这条消息拷贝到sudog.elem里面, 这代表我这条消息已经发给你了.
    • 最后因为这个接收者之前因为没收到消息一直在沉睡中, 通过goready唤醒

ok以上说明了几件事, 通道是如何知道消息应该发给谁, 消息是怎么发送过去的, 以及我们看到的阻塞效果是怎么实现的.

仔细想想, 这阻塞? 在某种条件达到以后自动解除阻塞? 这个场景好像在哪里见过? 你小子在暗示 sync.WaitGroup !! 等 wg.Count 变成0了之后自动解除阻塞, wg使用的阻塞效果也正是通过gopark实现的! 这种沉睡/阻塞最大的特点就是, 某个G被放入沉睡以后, 必须由你手动唤醒, 在我们的场景中这个条件就是找到了接收方, 在wg的场景中这个条件就是wg.Count变成0了, 条件一命中, 我手动立刻帮你唤醒并解除阻塞.

收发 - 异步模式

想象一下异步模式与同步模式的区别在哪? 唯一的区别, 仅仅是在管道填满了才会产生堵塞, 不然你发完/收完就走人.

剩下来原理基本一样, chan <- data 操作同样被翻译成 chansend 函数的调用, 发现缓冲槽大小不为0以后进入异步工作模式, 开始检查缓冲槽的剩余舱位

  • 如果还有剩余舱位, 将消息通过memmove拷贝到管道内, 然后如果发现还有接收者正在堵塞中, 通过goready唤醒
  • 如果没有剩余舱位, 通过gopark进入休眠模式, 在被唤醒以后检查有没有数据

关闭

到了这儿你已经对这一套工作模式非常了解了, 所谓的关闭其实就是遍历通道的发送者队列+接收者队列, 在他们的数据区发送一条nil消息, 然后执行goready唤醒他们中的每一个人

Select的工作模式

经常与通道一起出现的就是Select, Select的功能只是: 从所有的通道case中随机挑一个能用的, 否则就一直堵塞直到出现一个能用的. 我们解析一下这种特性是怎么做到的.

随机序

type hselect struct {
  ncases     uint16  // 总数
  poll_order *uint16 // 随机序号
  cases      []scase // 按照初始化顺序的case队列
}
type scase struct {
  c    *hchan // case的本体, 一个通道
  kind uint16 // 通道类型
}
复制代码

一个select在初始化的时候, 然后把所有的通道从chan类型包装成 scase 类型, 添加上一个字段叫做Kind,这个字段可以是"接收者通道"/"发送者通道", 最后还有一个"default"类型通道, 表明这是一个default case.

然后会生成一个随机序号存到poll_order字段中去, 这代表一个随机数, 然后等程序运行到select的位置的时候, 调用 select_go 函数, 开始找可以用的通道:

for i,_ := range [0...ncases] {
    random_case_id := poll_order[i]
    random_chanel  := cases[random_id]
    if check(random_chanel) {
        return 
    }
}
if default_case != nil {
    execute(default_case)
    return
}
复制代码

我们按照以上的方法去执行随机序, 在所有的case都遍历完了以后, 如果没用能用的, 检查有没有能用的default用

沉睡的select

我们已经知道通道的沉睡与唤醒是怎么实现的, 针对select有意思的一点是, 如果子通道被唤醒, 则自己这个selectG也同时被唤醒了. 这点很神奇, 怎么做到的

for i,_ := range [0...ncases] {
    random_case_id := poll_order[i]
    random_chanel  := cases[random_id]
    if random_chanel.kind == recv_chanel {
        random_chanel.recvq.append(selG)
    }
    if random_chanel.kind == send_chanel {
        random_chanel.sendq.append(selG)
    }
}

gopark(selectG)
复制代码

同样的, 我们也是遍历select下的所有通道, 把自己添加到通道的消息队列中去

  • 如果是它是发送类型通道, 那就在它的发送者队列中添加自己这个selectG
  • 如果是它是接收类型通道, 那就在它的接收者队列中添加自己这个selectG

想一想这样做会发生什么, 自己这个SelectG协程会同时出现在很多通道的消息队列里, 其中任何一个通道被 goready 唤醒的时候, 自己这个SelectG也会被通知到, 自己也会跟这个通道一起被唤醒. 这就实现了select会一直堵塞直到其中任何一个通道畅通为止的特性

欢迎关注我们的微信公众号,每天学习Go知识

FveQFjN.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK