2

Golang并发编程——goroutine、channel、sync - 平凡键客

 1 year ago
source link: https://www.cnblogs.com/arvinhuang/p/16437905.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

并发与并行#

并发和并行是有区别的,并发不等于并行。

并发#

两个或多个事件在同一时间不同时间间隔发生。对应在Go中,就是指多个 goroutine 在单个CPU上的交替运行。

并行#

两个或者多个事件在同一时刻发生。对应在Go中,就是指多个 goroutine 在多个CPU上同时运行。

goroutine#

介绍#

goroutine 是 Go 中一种轻量级线程。也称为用户态线程。由 Go 的 runtime 进行管理。Go 的程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。

在程序中,我们只要使用 go 关键字,就可以轻易开启一个 goroutine

建议#

在使用 goroutine 时,以下两个建议可以有效避免 goroutine 泄露。

  1. 调用者清楚 goroutine 什么时候结束
  2. 调用者可以控制 goroutine 的生命周期

来看一个泄露的例子

func leak() {
        ch := make(chan int)
	go func() {
		<-ch//leak 函数阻塞在接受 ch 
		fmt.Println("receive a value")
	}()
}
func main() {
	leak(ch)//函数返回,
}

这个channel将无法被关闭,leak 函数里开启的 goroutine 也永远无法返回,当然,这个例子中 leak 函数返回了,main 函数结束,leak 函数里开启的 goroutine 也就返回了。

1.调用者不清楚什么时候结束,也无法控制 goroutine 的生命周期。只能被动等待 channel 接受信号,然后执行函数逻辑,如你所见,造成的后果便是容易产生 goroutine 泄露。

来看下面一个例子

type Worker struct {
	wg sync.WaitGroup
}

func (w *Worker) Do() {
	w.wg.Add(1)

	go func() {
		defer w.wg.Done()
		//do someting
		time.Sleep(800 * time.Millisecond)
		fmt.Println("finish")
	}()

}

func (w *Worker) Shutdown(ctx context.Context) error {
	ch := make(chan struct{})
	go func() {
		w.wg.Wait()
		close(ch)
	}()

	select {
	case <-ch:
		return nil
	case <-ctx.Done():
		// time out
		// close(ch)
		return errors.New("time out")
	}
}

func main() {
	worker := &Worker{
		wg: sync.WaitGroup{},
	}
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1000*time.Millisecond))
	defer cancel()
	worker.Do()
	if err := worker.Shutdown(ctx); err != nil {
		fmt.Println(err)
	}
}

有一个 worker 对象,这个对象会做一些耗时操作。我们在 Do() 方法中使用 goroutine 来处理具体逻辑,在开启goroutine 之前调用 wg.Add(1), 然后在 goroutine 的 defer 函数中 调用 wg.Done(),在 Shutdown() 方法中使用 wg.Wait() 来等待 Do() 方法执行结束。在 Shutdown() 方法中,如果 goroutine 执行结束了,就会往 ch channel 中发送消息,底下 select {} 中收到 ch channel 消息后,Shutdown 方法就可以正常返回,函数到此执行结束。如果 Do() 方法执行太长超出了 ctx 的最长时间。Shutdown 会返回 "time out" 异常。返回之前可以进行资源的处理。

在这个例子中调用者可以通过控制上下文控制来控制 Worker 对象的生命周期。

sync.Mutex、_s_ync.RWMutex#

Go 的 sync 包提供了 mutex、RwMutex,分别是互斥锁与读写锁。

在需要共享内存的地方,如果有多个对象同时对这个地方进行读写操作,就会产生竞态条件。我们需要使用程序语言提供的同步原语对读写操作进行保护。互斥锁就是同一时刻一段代码只能被一个线程/协程运行。Mutex 在大量并发的情况下,会造成锁等待,对性能的影响比较大。在读多写少的场景下可以使用读写锁。读写锁主要遵循以下原则:

  1. 读写锁的读锁可以重入,在已经有读锁的情况下,可以继续加读锁。
  2. 在读锁没有全部解锁时,写操作会阻塞直到所有读锁解锁。
  3. 在写锁没有解锁时,其他协程的读写操作都会被阻塞,直到写锁解锁。

下面是一个互斥锁简单示例。在需要访问共享资源的地方使用 Lock 和 Unlock 方法。表示这部分操作属于“原子操作”。使用时需要注意锁粒度。我们要尽可能的减小锁粒度。锁粒度小了,锁竞争就少。对程序的性能影响就小。

var l sync.Mutex
var a string

func f() {
	a = "hello, world"
	l.Unlock()
}

func main() {
	l.Lock()
	go f()
	l.Lock()
	print(a)
}

sync/atomic#

sync/atomic 提供了用于实现同步算法的底层原子内存原语

copy-on-write 思路在微服务降级或者 local cache 经常使用。我们可以使用 atomic 来实现。atmic 依赖于原子 CPU 指令而不是依赖外部锁,性能不俗。

type NumberArray struct {
	array []int
}

func main() {
	var atomic atomic.Value

	go func() {
		var i int
		for {
			i++
			numArray := &NumberArray{
				array: []int{i, i + 1, i + 2, i + 3},
			}
			atomic.Store(numArray)
			time.Sleep(100 * time.Millisecond)
		}
	}()

	time.Sleep(500 * time.Millisecond) //先让数据更新

	var wg sync.WaitGroup
	for n := 0; n < 100000; n++ {
		wg.Add(1)
		time.Sleep(100 * time.Millisecond)
		go func() {
			numArray := atomic.Load()
			fmt.Println(numArray)
			wg.Done()
		}()
	}
	wg.Wait()
}

errgroup#

errgroup 为处理公共任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。

https://github.com/go-kratos/kratos/blob/main/app.go

func (a *App) Run() error {
	instance, err := a.buildInstance()
	if err != nil {
		return err
	}
	eg, ctx := errgroup.WithContext(NewContext(a.ctx, a))
	wg := sync.WaitGroup{}
	for _, srv := range a.opts.servers {
		srv := srv
		eg.Go(func() error {
			<-ctx.Done() // wait for stop signal
			stopCtx, cancel := context.WithTimeout(NewContext(a.opts.ctx, a), a.opts.stopTimeout)
			defer cancel()
			return srv.Stop(stopCtx)
		})
		wg.Add(1)
		eg.Go(func() error {
			wg.Done()
			return srv.Start(NewContext(a.opts.ctx, a))
		})
	}
	wg.Wait()
	if a.opts.registrar != nil {
		rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout)
		defer rcancel()
		if err := a.opts.registrar.Register(rctx, instance); err != nil {
			return err
		}
		a.lk.Lock()
		a.instance = instance
		a.lk.Unlock()
	}
	c := make(chan os.Signal, 1)
	signal.Notify(c, a.opts.sigs...)
	eg.Go(func() error {
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-c:
				if err := a.Stop(); err != nil {
					a.opts.logger.Errorf("failed to stop app: %v", err)
					return err
				}
			}
		}
	})
	if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
		return err
	}
	return nil
}

channels#

channel 是 Go 语言中一种类型安全的消息队列,充当两个 goroutine 之间的通道,通过它可以进行任意资源的的交换。同时通过 channel 实现 Go 的同步机制。

无缓冲通道#

当创建的 channel 没有缓冲时,称为无缓冲通道。无缓冲管道必须读写同时操作才会有效果,如果只进行读或者只进行写那么会被阻塞,等待另外一方的操作。

缓冲通道#

创建的 channel 具有缓冲时,称为缓冲通道。缓冲通道是固定容量的先进先出(FIFO)队列。容量在队列创建的时候就已经固定,运行是无法更改。消费者从队列中取出元素并处理它们。如果队列为空并且消费者无事可做,就会发生阻塞,直到生产者放入一个元素。如果队列已满,并且消费者未开始消费,则会发生阻塞,知道消费者消费一个元素。

不论是无缓冲通道还是缓冲通道,都不能往一个已关闭的 channel 发送消息,否则程序会直接 panic ,因此,最好是由发送端进行关闭 channel。

func main() {
	ch := make(chan int)
	close(ch)
	fmt.Println(<-ch)//0
	//close(ch)  //panic: close of closed channel
	//ch <- 2  //panic: send on closed channel

	chs := make(chan int, 2)
	chs <- 1
	chs <- 3
	close(chs)
	fmt.Println(<-chs)
	fmt.Println(<-chs)
	fmt.Println(<-chs)//0
	// chs <- 2 //panic: send on closed channel
}

关于channel 还可以查看这篇文章 polarisxu:无缓冲和有缓冲通道


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK