29

[快速入门]Go语言的CSP并发模型

 3 years ago
source link: https://studygolang.com/articles/29141
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Go语言实现了一下两种并发形式:

第一种是大家普遍认知的: 多线程共享内存 。其实就许多主流编程语言中的多线程开发。

另外一种是Go语言特有的,也是Go语言推荐的: CSP(communicating sequential processes)并发模型 。该方式是Go语言最大的两个亮点goroutine和chan,二者合体的典型应用。

CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel (管道)进行通信的并发模型。

Go语言其实只用到了 CSP 的很小一部分,即理论中的 Process/Channel(对应到语言中的 goroutine/channel):这两个并发原语之间没有从属关系, Process 可以订阅任意个 Channel,Channel 也并不关心是哪个 Process 在利用它进行通信;Process 围绕 Channel 进行读写,形成一套有序阻塞和可预测的并发模型。

相信大家一定见过一句话:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通过共享内存来通信,而要通过通信来实现内存共享。

这就是 Go 的并发哲学,它依赖 CSP 模型,基于 channel 实现。

channel

channel的创建

channel 字面意义是 “通道”,类似于 Linux 中的管道。声明 channel 的语法如下:

chan T          // 可以接收和发送类型为 T 的数据
chan<- float64  // 只可以用来发送 float64 类型的数据
<-chan int      // 只可以用来接收 int 类型的数据
复制代码

使用make初始化Channel,并且可以设置容量:

make(chan int, 100)
复制代码

因为 channel 是一个引用类型,所以在它被初始化之前,它的值是 nil,channel 使用 make 函数进行初始化。可以向它传递一个 int 值,代表 channel 缓冲区的大小(容量),构造出来的是一个缓冲型的 channel;不传或传 0 的,构造的就是一个非缓冲型的 channel。

Channel 分为两种:带缓冲、不带缓冲。对不带缓冲的 channel 进行的操作实际上可以看作 “同步模式”,带缓冲的则称为 “异步模式”。

同步模式 下,发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输。否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。

IzUvqqU.png!web

异步模式 下,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。

BFN7V3i.png!web

代码示例

//这里定义两个函数,下面分别验证同步模式执行以及异步模式执行的效果
func service() {
	time.Sleep(time.Millisecond * 30)
	return "Done"
}
func otherTask() {
	fmt.Println("this is other task B")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("Task B is done")
}
复制代码

同步模式执行

func AsyncService() chan string { 
	//阻塞模式,即A将信息放进channel直到有人读取,否则将一直阻塞	
	retCh := make(chan string) 
	go func () {
		ret := service()
		fmt.Println("service return result")
		retCh <- ret 
		fmt.Println("service exited")
	}()
	return retCh
}

//单元测试
func TestAsynService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh)
	time.Sleep(time.Second * 1)
}
复制代码

单测结果运行如下,可以看出等到当othertask执行完开始从chan中取数据时协程才继续向下执行,在这之前一直处于挂起状态

this is other task B
service return result
Task B is done
Done
service exited
复制代码

异步模式执行

func AsyncService() chan string { 
	retCh := make(chan string,1) //buffer模式,非阻塞 丢进channel就继续向下执行
	go func () {
		ret := service()
		fmt.Println("service return result")
		retCh <- ret 
		fmt.Println("service exited")
	}()
	return retCh
}

func TestAsynService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh)
	time.Sleep(time.Second * 1)
}
复制代码

执行结果如下,可以明显的看到这种模式下并没有等待从chan中获取消息,直接向下继续运行

this is other task
service return result
service exited
Task B is done
Done
复制代码

channel的使用

1.send操作

c := make(chan int)
c <- 3
复制代码

注意,往一个已经被close的channel中继续发送数据会导致 run-time panic

2.recive操作

c := make(chan int)
c <- 3
i := <-c
fmt.Println(i) //3
复制代码

从一个nil channel中接收数据会一直被block,直到有数据可以接收;从一个被close的channel中接收数据不会被阻塞,而是立即返回,会返回元素类型的零值(zero value)以及一个代表当前channel状态的bool值。可以通过这个特性判断channel是否关闭

if x, ok := <-ch;ok {    //ok 为bool值,true标识正常接收,false表示通道关闭
    ...
}else{
    ...
} 
复制代码

3.close操作

c := make(chan int)
close(c)
复制代码

所有的channel接受者都会在channel关闭时,立刻从阻塞等待中返回且上述ok值为false(如果有值可取依旧会正常取值)。这个广播机制常被利用,进行向多个订阅者同时发送信号

代码示例

//数据生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}
		close(ch)	//channel关闭

		wg.Done()
	}()

}

//数据接受者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for {
			if data, ok := <-ch; ok {	//channel关闭后,ok值将变为false
				fmt.Println(data)
			} else {
				break
			}
		}
		wg.Done()
	}()

}

func TestCloseChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	dataProducer(ch, &wg)
	wg.Add(1)
	dataReceiver(ch, &wg)
	wg.Wait()
复制代码

与switch-case搭配实现选路

select-case语句配合channel可以实现多路选择以及超时控制功能,每个case后面跟一个阻塞事件,当有事件收到响应后则结束等待,如果均没有响应则执行default

//多渠道选择
//原理如下,采用select-case语句 每个case后面跟一个阻塞事件,当有事件收到响应后则结束等待,如果均没有响应则执行default
func TestSwitch(t *testing.T){
	select{
		case ret1 := <-retCH1:
			t.Logf("case 1 return")
		case ret2 := <-retCH2:
			t.Logf("case 2 return")
		default:
			t.Logf("no one return")
	}
}

//超时控制
func TestTimeOut(t *testing.T){
	select {
	case ret := <- retCH1:
		t.Logf("case 1 return")
	case <-time.After(time.Second*1):
		t.Logf("time out")
	}
}
复制代码

欢迎关注我们的微信公众号,每天学习Go知识

FveQFjN.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK