1

Golang 笔记 - Fan-in

 2 years ago
source link: https://greenhathg.github.io/2022/04/23/Golang%E7%AC%94%E8%AE%B0-Fan-in/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

GreenHatHG の Blog

Golang 笔记 - Fan-in

发表于 2022-04-23|更新于 2022-04-23|编程
字数总计:938|阅读时长:5 分钟 | 阅读量:| 评论数:0

来源:https://go.dev/talks/2012/concurrency.slide

  • The boring function runs, like a boring party guest.

    func boring(msg string) {
    for i := 0; ; i++ {
    fmt.Println(msg, i)
    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
    }

    func main() {
    go boring("boring!")
    fmt.Println("I'm listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You're boring; I'm leaving.")
    }
  • A channel connects the main and boring goroutines so they can communicate.

    func boring(msg string, c chan<- string) {
    for i := 0; ; i++ {
    // Expression to be sent can be any suitable value.
    c <- fmt.Sprintf("%s %d", msg, i)
    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
    }

    func main() {
    c := make(chan string)
    go boring("boring!", c)
    for i := 0; i < 5; i++{
    fmt.Printf("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")
    }
  • Generator Pattern: function that returns a channel

    // Returns receive-only channel of strings.
    func boring(msg string) <-chan string {
    c := make(chan string)
    // We launch the goroutine from inside the function.
    go func() {
    for i := 0; ; i++ {
    c <- fmt.Sprintf("%s %d", msg, i)
    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
    }()
    return c // Return the channel to the caller.

    }

    func main() {
    c := boring("boring!") // Function returning a channel.
    for i := 0; i < 5; i++{
    fmt.Printf("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")
    }
  • Multiplexing: Fan-in

    func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()
    return c
    }

    func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 10; i++{
    fmt.Println(<-c)
    }
    fmt.Println("You're both boring; I'm leaving.")
    }
  • Fan-in using select

    func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
    for {
    select {
    case s := <-input1: c <- s
    case s := <-input2: c <- s
    }
    }
    }()
    return c
    }
  • keep sequencing: Each speaker must wait for a go-ahead.

    type Message struct {
    msg string
    wait chan bool
    }

    func boring(msg string) <-chan Message {
    rand.Seed(time.Now().UnixNano())
    waitForIt := make(chan bool) // Shared between all messages.
    c := make(chan Message)
    go func() {
    for i := 0; ; i++ {
    c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    <-waitForIt
    }
    }()
    return c
    }

    func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 5; i++{
    msg1 := <-c
    fmt.Println(msg1.msg)
    msg2 := <-c
    fmt.Println(msg2.msg)
    msg1.wait <- true
    msg2.wait <- true
    }
    fmt.Println("You're both boring; I'm leaving.")
    }
  • Timeout:

    func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for{
    select {
    case s := <-c: fmt.Println(s.msg)
    case <-time.After(1*time.Second):
    fmt.Println("You're too slow.")
    return
    }
    }
    }
  • Example -Google Search:

    var (
    Web = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
    )

    type Result string
    type Search func(query string) Result

    func fakeSearch(kind string) Search {
    return func(query string) Result {
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    return Result(fmt.Sprintf("%s result for %q\n", kind, query))
    }
    }

    func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
    select {
    case result := <-c:
    results = append(results, result)
    case <-timeout: //Don't wait for slow servers.
    fmt.Println("timed out")
    return
    }
    }
    return
    }

    func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
    }
    • Avoid discarding results from slow servers: Replicate the servers. Send requests to multiple replicas, and use the first response. More
      func First(query string, replicas ...Search) Result {
      c := make(chan Result)
      searchReplica := func(i int) { c <- replicas[i](query) }
      for i := range replicas {
      go searchReplica(i)
      }
      return <-c
      }

      func Google(query string) (results []Result) {
      c := make(chan Result)
      go func() { c <- First(query, fakeSearch("web"), fakeSearch("web")) } ()
      go func() { c <- First(query, fakeSearch("image"), fakeSearch("image")) } ()
      go func() { c <- First(query, fakeSearch("video"), fakeSearch("video")) } ()

      timeout := time.After(80 * time.Millisecond)
      for i := 0; i < 3; i++ {
      select {
      case result := <-c:
      results = append(results, result)
      case <-timeout: //Don't wait for slow servers.
      fmt.Println("timed out")
      return
      }
      }
      return
      }

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK