14

一文读懂Channel设计

 3 years ago
source link: https://studygolang.com/articles/31748
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在Go中,要理解channel,首先需要认识goroutine。

一、为什么会有goroutine

现代操作系统中为我们提供了三种基本的构造并发程序的方法:多进程、I/O多路复用和多线程。其中最简单的构造方式当属多进程,但是多进程的并发程序,由于对进程控制和进程间通信开销巨大,这样的并发方式往往会很慢。

因此,操作系统提供了更小粒度的运行单元:线程(确切叫法是内核线程)。它是一种运行在进程上下文中的逻辑流,线程之间通过操作系统来调度,其调度模型如下图所示。

ZZvUNz2.png!mobile

多线程的并发方式,相较于多进程而言要快得多。但是由于线程上下文切换总是不可避免的陷入内核态,它的开销依然较大。那么有没有不必陷入内核态的运行载体呢?有,用户级线程。 用户级线程的切换由用户程序自己控制,不需要内核干涉,因此少了进出内核态的消耗。

fA7NVbF.png!mobile

这里的用户级线程就是协程(coroutine),它们的切换由运行时系统来统一调度管理,内核并不知道它的存在。协程是抽象于内核线程之上的对象,一个内核线程可以对应多个协程。但最终的系统调用仍然需要内核线程来完成。注意,线程的调度是操作系统来管理,是一种抢占式调度。而协程不同,协程之间需要合作,会主动交出执行权,是一种协作式调度,这也是为何被称为协程的原因。

Go天生在语言层面支持了协程,即我们常说的goroutine。Go的runtime系统实现的是一种M:N调度模型,通过GMP对象来描述,其中G代表的就是协程,M是线程,P是调度上下文。在Go程序中,一个goroutine就代表着一个最小用户代码执行流,它们也是并发流的最小单元。

二、channel的存在定位

从内存的角度而言,并发模型只分两种:基于共享内存和基于消息通信(内存拷贝)。在Go中,两种并发模型的同步原语均有提供:sync.\ 和atomic.\ 代表的就是基于共享内存;channel代表的就是基于消息通信。而Go提倡后者,它包括三大元素:goroutine(执行体),channel(通信),select(协调)。

Do not communicate by sharing memory; instead, share memory by communicating.

在Go中通过goroutine+channel的方式,可以简单、高效地解决并发问题,channel就是goroutine之间的数据桥梁。

Concurrency is the key to designing high performance network services. Go's concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution.

以下是一个简单的channel使用示例代码。

func goroutineA(ch <-chan int)  {
    fmt.Println("[goroutineA] want a data")
    val := <- ch
    fmt.Println("[goroutineA] received the data", val)
}

func goroutineB(ch chan<- int)  {
    time.Sleep(time.Second*1)
    ch <- 1
    fmt.Println("[goroutineB] send the data 1")
}

func main() {
    ch := make(chan int, 1)
    go goroutineA(ch)
    go goroutineB(ch)
    time.Sleep(2*time.Second)
}

上述过程趣解图如下

yUjEner.png!mobile

bEZZjeB.png!mobile

7JZfim7.png!mobile

YnyU3e.png!mobile

三、channel源码解析

channel源码位于src/go/runtime/chan.go。本章内容分为两部分:channel内部结构和channel操作。

3.1 channel内部结构

ch := make(chan int,2)

对于以上channel的申明语句,我们可以在程序中加入断点,得到ch的信息如下。

2Q3Uv2M.png!mobile

很好,看起来非常的清晰。但是,这些信息代表的是什么含义呢?接下来,我们先看几个重要的结构体。

  • hchan

当我们通过make(chan Type, size)生成channel时,在runtime系统中,生成的是一个hchan结构体对象。源码位于src/runtime/chan.go

type hchan struct {
    qcount   uint           // 循环队列中数据数
    dataqsiz uint           // 循环队列的大小
    buf      unsafe.Pointer // 指向大小为dataqsize的包含数据元素的数组指针
    elemsize uint16         // 数据元素的大小
    closed   uint32         // 代表channel是否关闭   
    elemtype *_type         // _type代表Go的类型系统,elemtype代表channel中的元素类型
    sendx    uint           // 发送索引号,初始值为0
    recvx    uint           // 接收索引号,初始值为0
  recvq    waitq          // 接收等待队列,存储试图从channel接收数据(<-ch)的阻塞goroutines
    sendq    waitq          // 发送等待队列,存储试图发送数据(ch<-)到channel的阻塞goroutines

    lock mutex              // 加锁能保护hchan的所有字段,包括waitq中sudoq对象
}
  • waitq

waitq用于表达处于阻塞状态的goroutines链表信息,first指向链头goroutine,last指向链尾goroutine

type waitq struct {
    first *sudog           
    last  *sudog
}
  • sudug

sudog代表的就是一个处于等待列表中的goroutine对象,源码位于src/runtime/runtime2.go

type sudog struct {
    g *g
    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)
    c        *hchan // channel
  ...
}

为了更好理解hchan结构体,我们将通过以下代码来理解hchan中的字段含义。

package main

import "time"

func goroutineA(ch chan int) {
    ch <- 100
}

func goroutineB(ch chan int) {
    ch <- 200
}

func goroutineC(ch chan int) {
    ch <- 300
}

func goroutineD(ch chan int) {
    ch <- 300
}

func main() {
    ch := make(chan int, 4)
    for i := 0; i < 4; i++ {
        ch <- i * 10
    }
    go goroutineA(ch)
    go goroutineB(ch)
    go goroutineC(ch)
    go goroutineD(ch)
    // 第一个sleep是为了给上足够的时间让所有goroutine都已启动
    time.Sleep(time.Millisecond * 500)
    time.Sleep(time.Second)
}

打开代码调试功能,将程序运行至断点time.Sleep(time.Second)处,此时得到的chan信息如下。

NNF7Nvz.png!mobile

在该channel中,通过make(chan int, 4)定义的channel大小为4,即dataqsiz的值为4。同时由于循环队列中已经添加了4个元素,所以qcount值也为4。此时,有4个goroutine(A-D)想发送数据给channel,但是由于存放数据的循环队列已满,所以只能进入发送等待列表,即sendq。同时要注意到,此时的发送和接收索引值均为0,即下一次接收数据的goroutine会从循环队列的第一个元素拿,发送数据的goroutine会发送到循环队列的第一个位置。

上述hchan结构可视化图解如下

77RVjyi.png!mobile

3.2 channel操作

将channel操作分为四部分:创建、发送、接收和关闭。

创建

本文的参考Go版本为1.15.2。其channel的创建实现代码位于src/go/runtime/chan.go的makechan方法。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

  // 发送元素大小限制
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
  // 对齐检查
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

  // 判断是否会内存溢出
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

  // 为构造的hchan对象分配内存
    var c *hchan
    switch {
  // 无缓冲的channel或者元素大小为0的情况
    case mem == 0:
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
  // 元素不包含指针的情况  
    case elem.ptrdata == 0:
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
  // 元素包含指针  
    default:
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

  // 初始化相关参数
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

可以看到,makechan方法主要就是检查传送元素的合法性,并为hchan分配内存,初始化相关参数,包括对锁的初始化。

发送

channel的发送实现代码位于src/go/runtime/chan.go的chansend方法。发送过程,存在以下几种情况。

  1. 当发送的channel为nil
if c == nil {
    if !block {
        return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
}

往一个nil的channel中发送数据时,调用gopark函数将当前执行的goroutine从running态转入waiting态。

  1. 往已关闭的channel中发送数据
if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

如果向已关闭的channel中发送数据,会引发panic。

  1. 如果已经有阻塞的接收goroutines(即recvq中指向非空),那么数据将被直接发送给接收goroutine。
if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

该逻辑的实现代码在send方法和sendDirect中。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ... // 省略了竞态代码
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

其中,memmove我们已经在源码系列中遇到多次了,它的目的是将内存中src的内容拷贝至dst中去。另外,注意到goready(gp, skip+1)这句代码,它会使得之前在接收等待队列中的第一个goroutine的状态变为runnable,这样go的调度器就可以重新让该goroutine得到执行。

  1. 对于有缓冲的channel来说,如果当前缓冲区hchan.buf有可用空间,那么会将数据拷贝至缓冲区
if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    if raceenabled {
        raceacquire(qp)
        racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
  // 发送索引号+1
    c.sendx++
  // 因为存储数据元素的结构是循环队列,所以当当前索引号已经到队末时,将索引号调整到队头
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
  // 当前循环队列中存储元素数+1
    c.qcount++
    unlock(&c.lock)
    return true
}

其中,chanbuf(c, c.sendx)是获取指向对应内存区域的指针。typememmove会调用memmove方法,完成数据的拷贝工作。另外注意到,当对hchan进行实际操作时,是需要调用lock(&c.lock)加锁,因此,在完成数据拷贝后,通过unlock(&c.lock)将锁释放。

  1. 有缓冲的channel,当hchan.buf已满;或者无缓冲的channel,当前没有接收的goroutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
    mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

通过getg获取当前执行的goroutine。acquireSudog是先获得当前执行goroutine的线程M,再获取M对应的P,最后将P的sudugo缓存队列中的队头sudog取出(详见源码src/runtime/proc.go)。通过c.sendq.enqueue将sudug加入到channel的发送等待列表中,并调用gopark将当前goroutine转为waiting态。

  • 发送操作会对hchan加锁。
  • 当recvq中存在等待接收的goroutine时,数据元素将会被直接拷贝给接收goroutine。
  • 当recvq等待队列为空时,会判断hchan.buf是否可用。如果可用,则会将发送的数据拷贝至hchan.buf中。
  • 如果hchan.buf已满,那么将当前发送goroutine置于sendq中排队,并在运行时中挂起。
  • 向已经关闭的channel发送数据,会引发panic。

对于无缓冲的channel来说,它天然就是hchan.buf已满的情况,因为它的hchan.buf的容量为0。

package main

import "time"

func main() {
    ch := make(chan int)
    go func(ch chan int) {
        ch <- 100
    }(ch)
    time.Sleep(time.Millisecond * 500)
    time.Sleep(time.Second)
}

在上述示例中,发送goroutine向无缓冲的channel发送数据,但是没有接收goroutine。将断点置于time.Sleep(time.Second),得到此时ch结构如下。

U3Mniuu.png!mobile

可以看到,在无缓冲的channel中,其hchan的buf长度为0,当没有接收groutine时,发送的goroutine将被置于sendq的发送队列中。

接收

channel的接收实现分两种,v :=<-ch对应于chanrecv1,v, ok := <- ch对应于chanrecv2,但它们都依赖于位于src/go/runtime/chan.go的chanrecv方法。

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

chanrecv的详细代码此处就不再展示,和chansend逻辑对应,具体处理准则如下。

  • 接收操作会对hchan加锁。
  • 当sendq中存在等待发送的goroutine时,意味着此时的hchan.buf已满(无缓存的天然已满),分两种情况(见代码src/go/runtime/chan.go的recv方法):1. 如果是有缓存的hchan,那么先将缓冲区的数据拷贝给接收goroutine,再将sendq的队头sudog出队,将出队的sudog上的元素拷贝至hchan的缓存区。 2. 如果是无缓存的hchan,那么直接将出队的sudog上的元素拷贝给接收goroutine。两种情况的最后都会唤醒出队的sudog上的发送goroutine。
  • 当sendq发送队列为空时,会判断hchan.buf是否可用。如果可用,则会将hchan.buf的数据拷贝给接收goroutine。
  • 如果hchan.buf不可用,那么将当前接收goroutine置于recvq中排队,并在运行时中挂起。
  • 与发送不同的是,当channel关闭时,goroutine还能从channel中获取数据。如果recvq等待列表中有goroutines,那么它们都会被唤醒接收数据。如果hchan.buf中还有未接收的数据,那么goroutine会接收缓冲区中的数据,否则goroutine会获取到元素的零值。

以下是channel关闭之后,接收goroutine的读取示例代码。

func main() {
    ch := make(chan int, 1)
    ch <- 10
    close(ch)
    a, ok := <-ch
    fmt.Println(a, ok)
    b, ok := <-ch
    fmt.Println(b, ok)
    c := <-ch
    fmt.Println(c)
}

//输出如下
10 true
0 false
0

注意:在channel中进行的所有元素转移都伴随着内存的拷贝。

func main() {
    type Instance struct {
        ID   int
        name string
    }

    var ins = Instance{ID: 1, name: "Golang"}

    ch := make(chan Instance, 3)
    ch <- ins

    fmt.Println("ins的原始值:", ins)

    ins.name = "Python"
    go func(ch chan Instance) {
        fmt.Println("channel接收值:", <-ch)
    }(ch)

    time.Sleep(time.Second)
    fmt.Println("ins的最终值:", ins)
}

// 输出结果
ins的原始值: {1 Golang}
channel接收值: {1 Golang}
ins的最终值: {1 Python}

前半段图解如下

JNfuq2.png!mobile

后半段图解如下

rquuuiU.png!mobile

注意,如果把channel传递类型替换为Instance指针时,那么尽管channel存入到buf中的元素已经是拷贝对象了,从channel中取出又被拷贝了一次。但是由于它们的类型是Instance指针,拷贝对象与原始对象均会指向同一个内存地址,修改原有元素对象的数据时,会影响到取出数据。

func main() {
    type Instance struct {
        ID   int
        name string
    }

    var ins = &Instance{ID: 1, name: "Golang"}

    ch := make(chan *Instance, 3)
    ch <- ins

    fmt.Println("ins的原始值:", ins)

    ins.name = "Python"
    go func(ch chan *Instance) {
        fmt.Println("channel接收值:", <-ch)
    }(ch)

    time.Sleep(time.Second)
    fmt.Println("ins的最终值:", ins)
}

// 输出结果
ins的原始值: &{1 Golang}
channel接收值: &{1 Python}
ins的最终值: &{1 Python}

因此,在使用channel时,尽量避免传递指针,如果传递指针,则需谨慎。

关闭

channel的关闭实现代码位于src/go/runtime/chan.go的chansend方法,详细执行逻辑已通过注释写明。

func closechan(c *hchan) {
  // 如果hchan对象为nil,则会引发painc
    if c == nil {
        panic(plainError("close of nil channel"))
    }

  // 对hchan加锁
    lock(&c.lock)
  // 不同多次调用close(c chan<- Type)方法,否则会引发painc
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

  // close标志
    c.closed = 1

  // gList代表Go的GMP调度的G集合
    var glist gList

    // 该for循环是为了释放recvq上的所有等待接收sudog
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // 该for循环会释放sendq上的所有等待发送sudog
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
  // 释放sendq和recvq之后,hchan释放锁
    unlock(&c.lock)

  // 将上文中glist中的加入的goroutine取出,让它们均变为runnable(可执行)状态,等待调度器执行
    // 注意:我们上文中分析过,试图向一个已关闭的channel发送数据,会引发painc。
  // 所以,如果是释放sendq中的goroutine,它们一旦得到执行将会引发panic。
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

关于关闭操作,有几个点需要注意一下。

  • 如果关闭已关闭的channel会引发painc。
  • 对channel关闭后,如果有阻塞的读取或发送goroutines将会被唤醒。读取goroutines会获取到hchan的已接收元素,如果没有,则获取到元素零值;发送goroutine的执行则会引发painc。

对于第二点,我们可以很好利用这一特性来实现对程序执行流的控制(类似于sync.WaitGroup的作用),以下是示例程序代码。

func main() {
    ch := make(chan struct{})
    //
    go func() {
        // do something work...
        // when work has done, call close()
        close(ch)
    }()
    // waiting work done
    <- ch
    // other work continue...
}

四、总结

channel是Go中非常强大有用的机制,为了更有效地使用它,我们必须了解它的实现原理,这也是写作本文的目的。

  • hchan结构体有锁的保证,对于并发goroutine而言是安全的
  • channel接收、发送数据遵循FIFO(First In First Out)原语
  • channel的数据传递依赖于内存拷贝
  • channel能阻塞(gopark)、唤醒(goready)goroutine
  • 所谓无缓存的channel,它的工作方式就是直接发送goroutine拷贝数据给接收goroutine,而不通过hchan.buf

另外,可以看到Go在channel的设计上权衡了简单与性能。为了简单性,hchan是有锁的结构,因为有锁的队列会更易理解和实现,但是这样会损失一些性能。考虑到整个 channel 操作带锁的成本较高,其实官方也曾考虑过使用无锁 channel 的设计,但是由于目前已有提案中( https://github.com/golang/go/... ),无锁实现的channel可维护性差、且实际性能测试不具有说服力,而且也不符合Go的简单哲学,因此官方目前为止并没有采纳无锁设计。

在性能上,有一点,我们需要认识到:所谓channel中阻塞goroutine,只是在runtime系统中被blocked,它是用户层的阻塞。而实际的底层内核线程不受影响,它仍然是unblocked的。

参考链接

https://speakerdeck.com/kavya...

https://codeburst.io/diving-d...

https://github.com/talkgo/nig...

有疑问加站长微信联系(非本文作者)

eUjI7rn.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK