控制并发有两种经典的方式,一种是WaitGroup,另一种就是Context。

什么是WaitGroup

它是一种控制并发的方式,它的这种控制并发的方式是控制多个goroutine同时完成。

func main() {
    var wg sync.WaitGroup

    wg.Add(2)
    go func() {
        time.Sleep(2*time.Second)
        fmt.Println("1号完成")
        wg.Done()
    }()
    go func() {
        time.Sleep(2*time.Second)
        fmt.Println("2号完成")
        wg.Done()
    }()
    wg.Wait()
    fmt.Println("好了,大家都干完了,放工")
}

一个很简单的例子,一定要例子中的2个goroutine同时做完,才算是完成,先做好的就要等着其他未完成的,所有的goroutine要都全部完成才可以。

这是一种控制并发的方式,这种尤其适用于,好多个goroutine协同做一件事情的时候,因为每个goroutine做的都是这件事情的一部分,只有全部的goroutine都完成,这件事情才算是完成,这是等待的方式。

在实际的业务种,我们可能会有这么一种场景:需要我们主动的通知某一个goroutine结束。比如我们开启一个后台goroutine一直做事情,比如监控,现在不需要了,就需要通知这个监控goroutine结束,不然它会一直跑,就泄漏了。

chan通知

我们都知道一个goroutine启动后,我们是无法控制他的,大部分情况是等待它自己结束,那么如果这个goroutine是一个不会自己结束的后台goroutine呢?比如监控等,会一直运行的。

这种情况化,一直傻瓜式的办法是全局变量,其他地方通过修改这个变量完成结束通知,然后后台goroutine不停的检查这个变量,如果发现被通知关闭了,就自我结束。

这种方式也可以,但是首先我们要保证这个变量在多线程下的安全,基于此,有一种更好的方式:chan + select 。

func main() {
    stop := make(chan bool)

    go func() {
        for {
            select {
            case <-stop:
                fmt.Println("监控退出,停止了...")
                return
            default:
                fmt.Println("goroutine监控中...")
                time.Sleep(2 * time.Second)
            }
        }
    }()

    time.Sleep(10 * time.Second)
    fmt.Println("可以了,通知监控停止")
    stop<- true
    //为了检测监控过是否停止,如果没有监控输出,就表示停止了
    time.Sleep(5 * time.Second)

}

例子中我们定义一个stop的chan,通知他结束后台goroutine。实现也非常简单,在后台goroutine中,使用select判断stop是否可以接收到值,如果可以接收到,就表示可以退出停止了;如果没有接收到,就会执行default里的监控逻辑,继续监控,只到收到stop的通知。

有了以上的逻辑,我们就可以在其他goroutine种,给stop chan发送值了,例子中是在main goroutine中发送的,控制让这个监控的goroutine结束。

发送了stop<- true结束的指令后,我这里使用time.Sleep(5 * time.Second)故意停顿5秒来检测我们结束监控goroutine是否成功。如果成功的话,不会再有goroutine监控中…的输出了;如果没有成功,监控goroutine就会继续打印goroutine监控中…输出。

这种chan+select的方式,是比较优雅的结束一个goroutine的方式,不过这种方式也有局限性,如果有很多goroutine都需要控制结束怎么办呢?如果这些goroutine又衍生了其他更多的goroutine怎么办呢?如果一层层的无穷尽的goroutine呢?这就非常复杂了,即使我们定义很多chan也很难解决这个问题,因为goroutine的关系链就导致了这种场景非常复杂。

初识Context

上面说的这种场景是存在的,比如一个网络请求Request,每个Request都需要开启一个goroutine做一些事情,这些goroutine又可能会开启其他的goroutine。所以我们需要一种可以跟踪goroutine的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的Context,称之为上下文非常贴切,它就是goroutine的上下文。

下面我们就使用Go Context重写上面的示例。

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("监控退出,停止了...")
                return
            default:
                fmt.Println("goroutine监控中...")
                time.Sleep(2 * time.Second)
            }
        }
    }(ctx)

    time.Sleep(10 * time.Second)
    fmt.Println("可以了,通知监控停止")
    cancel()
    //为了检测监控过是否停止,如果没有监控输出,就表示停止了
    time.Sleep(5 * time.Second)
}

重写比较简单,就是把原来的chan stop 换成Context,使用Context跟踪goroutine,以便进行控制,比如结束等。

context.Background() 返回一个空的Context,这个空的Context一般用于整个Context树的根节点。然后我们使用context.WithCancel(parent)函数,创建一个可取消的子Context,然后当作参数传给goroutine使用,这样就可以使用这个子Context跟踪这个goroutine。

在goroutine中,使用select调用<-ctx.Done()判断是否要结束,如果接受到值的话,就可以返回结束goroutine了;如果接收不到,就会继续进行监控。

那么是如何发送结束指令的呢?这就是示例中的cancel函数啦,它是我们调用context.WithCancel(parent)函数生成子Context的时候返回的,第二个返回值就是这个取消函数,它是CancelFunc类型的。我们调用它就可以发出取消指令,然后我们的监控goroutine就会收到信号,就会返回结束。

Context控制多个goroutine

使用Context控制一个goroutine的例子如上,非常简单,下面我们看看控制多个goroutine的例子,其实也比较简单。

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go watch(ctx,"【监控1】")
    go watch(ctx,"【监控2】")
    go watch(ctx,"【监控3】")

    time.Sleep(10 * time.Second)
    fmt.Println("可以了,通知监控停止")
    cancel()
    //为了检测监控过是否停止,如果没有监控输出,就表示停止了
    time.Sleep(5 * time.Second)
}

func watch(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println(name,"监控退出,停止了...")
            return
        default:
            fmt.Println(name,"goroutine监控中...")
            time.Sleep(2 * time.Second)
        }
    }
}

示例中启动了3个监控goroutine进行不断的监控,每一个都使用了Context进行跟踪,当我们使用cancel函数通知取消时,这3个goroutine都会被结束。这就是Context的控制能力,它就像一个控制器一样,按下开关后,所有基于这个Context或者衍生的子Context都会收到通知,这时就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。

Context接口

Context的接口定义的比较简洁,我们看下这个接口的方法。

type Context interface {
    Deadline() (deadline time.Time, ok bool)

    Done() <-chan struct{}

    Err() error

    Value(key interface{}) interface{}
}

这个接口共有4个方法,了解这些方法的意思非常重要,这样我们才可以更好的使用他们。

Deadline方法是获取设置的截止时间的意思,第一个返回式是截止时间,到了这个时间点,Context会自动发起取消请求;第二个返回值ok==false时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消。

Done方法返回一个只读的chan,类型为struct{},我们在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求,我们通过Done方法收到这个信号后,就应该做清理操作,然后退出goroutine,释放资源。

Err方法返回取消的错误原因,因为什么Context被取消。

Value方法获取该Context上绑定的值,是一个键值对,所以要通过一个Key才可以获取对应的值,这个值一般是线程安全的。

以上四个方法中常用的就是Done了,如果Context取消的时候,我们就可以得到一个关闭的chan,关闭的chan是可以读取的,所以只要可以读取的时候,就意味着收到Context取消的信号了,以下是这个方法的经典用法。

 func Stream(ctx context.Context, out chan<- Value) error {
    for {
        v, err := DoSomething(ctx)
        if err != nil {
            return err
        }
        select {
        case <-ctx.Done():
            return ctx.Err()
        case out <- v:
        }
    }
  }

Context接口并不需要我们实现,Go内置已经帮我们实现了2个,我们代码中最开始都是以这两个内置的作为最顶层的partent context,衍生出更多的子Context。

var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)

func Background() Context {
    return background
}

func TODO() Context {
    return todo
}

一个是Background,主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context。

一个是TODO,它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。

他们两个本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context。

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
}

这就是emptyCtx实现Context接口的方法,可以看到,这些方法什么都没做,返回的都是nil或者零值。

Context的继承衍生

有了如上的根Context,那么是如何衍生更多的子Context的呢?这就要靠context包为我们提供的With系列的函数了。

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

这四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思,这种方式可以理解为子Context对父Context的继承,也可以理解为基于父Context的衍生。

通过这些函数,就创建了一颗Context树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个。

WithCancel函数,传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context。 WithDeadline函数,和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消。

WithTimeout和WithDeadline基本上一样,这个表示是超时自动取消,是多少时间后自动取消Context的意思。

WithValue函数和取消Context无关,它是为了生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到,后面我们会专门讲。

大家可能留意到,前三个函数都返回一个取消函数CancelFunc,这是一个函数类型,它的定义非常简单。

type CancelFunc func()

这就是取消函数的类型,该函数可以取消一个Context,以及这个节点Context下所有的所有的Context,不管有多少层级。

WithValue传递元数据

通过Context我们也可以传递一些必须的元数据,这些数据会附加在Context上以供使用。

var key string="name"

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    //附加值
    valueCtx:=context.WithValue(ctx,key,"【监控1】")
    go watch(valueCtx)
    time.Sleep(10 * time.Second)
    fmt.Println("可以了,通知监控停止")
    cancel()
    //为了检测监控过是否停止,如果没有监控输出,就表示停止了
    time.Sleep(5 * time.Second)
}

func watch(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            //取出值
            fmt.Println(ctx.Value(key),"监控退出,停止了...")
            return
        default:
            //取出值
            fmt.Println(ctx.Value(key),"goroutine监控中...")
            time.Sleep(2 * time.Second)
        }
    }
}

在前面的例子,我们通过传递参数的方式,把name的值传递给监控函数。在这个例子里,我们实现一样的效果,但是通过的是Context的Value的方式。

我们可以使用context.WithValue方法附加一对K-V的键值对,这里Key必须是等价性的,也就是具有可比性;Value值要是线程安全的。

这样我们就生成了一个新的Context,这个新的Context带有这个键值对,在使用的时候,可以通过Value方法读取ctx.Value(key)。

记住,使用WithValue传值,一般是必须的值,不要什么值都传递。

Context使用的原则

  1. 不要把Context放在结构体中,要以参数的方式传递
  2. 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。
  3. 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
  4. Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递
  5. Context是线程安全的,可以放心的在多个goroutine中传递

Context的使用场景

场景一:RPC调用

在主goroutine上有4个RPC,RPC2/3/4是并行请求的,我们这里希望在RPC2请求失败之后,直接返回错误,并且让RPC3/4停止继续计算。这个时候,就使用的到Context。

package main

import (
    "context"
    "sync"
    "github.com/pkg/errors"
)

func Rpc(ctx context.Context, url string) error {
    result := make(chan int)
    err := make(chan error)

    go func() {
        // 进行RPC调用,并且返回是否成功,成功通过result传递成功信息,错误通过error传递错误信息
        isSuccess := true
        if isSuccess {
            result <- 1
        } else {
            err <- errors.New("some error happen")
        }
    }()

    select {
        case <- ctx.Done():
            // 其他RPC调用调用失败
            return ctx.Err()
        case e := <- err:
            // 本RPC调用失败,返回错误信息
            return e
        case <- result:
            // 本RPC调用成功,不返回错误信息
            return nil
    }
}


func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // RPC1调用
    err := Rpc(ctx, "http://rpc_1_url")
    if err != nil {
        return
    }

    wg := sync.WaitGroup{}

    // RPC2调用
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := Rpc(ctx, "http://rpc_2_url")
        if err != nil {
            cancel()
        }
    }()

    // RPC3调用
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := Rpc(ctx, "http://rpc_3_url")
        if err != nil {
            cancel()
        }
    }()

    // RPC4调用
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := Rpc(ctx, "http://rpc_4_url")
        if err != nil {
            cancel()
        }
    }()

    wg.Wait()
}

当然我这里使用了waitGroup来保证main函数在所有RPC调用完成之后才退出。

在Rpc函数中,第一个参数是一个CancelContext, 这个Context形象的说,就是一个传话筒,在创建CancelContext的时候,返回了一个听声器(ctx)和话筒(cancel函数)。所有的goroutine都拿着这个听声器(ctx),当主goroutine想要告诉所有goroutine要结束的时候,通过cancel函数把结束的信息告诉给所有的goroutine。当然所有的goroutine都需要内置处理这个听声器结束信号的逻辑(ctx->Done())。我们可以看Rpc函数内部,通过一个select来判断ctx的done和当前的rpc调用哪个先结束。

这个waitGroup和其中一个RPC调用就通知所有RPC的逻辑,其实有一个包已经帮我们做好了。errorGroup 具体这个errorGroup包的使用可以看这个包的test例子。

有人可能会担心我们这里的cancel()会被多次调用,context包的cancel调用是幂等的。可以放心多次调用。

我们这里不妨品一下,这里的Rpc函数,实际上我们的这个例子里面是一个“阻塞式”的请求,这个请求如果是使用http.Get或者http.Post来实现,实际上Rpc函数的Goroutine结束了,内部的那个实际的http.Get却没有结束。所以,需要理解下,这里的函数最好是“非阻塞”的,比如是http.Do,然后可以通过某种方式进行中断。

比如像这篇文章 Cancel http.Request using Context 中的这个例子:

func httpRequest(
  ctx context.Context, 
  client *http.Client, 
  req *http.Request, 
  respChan chan []byte, 
  errChan chan error
) {
  req = req.WithContext(ctx)
  tr := &http.Transport{}
  client.Transport = tr
  go func() {
    resp, err := client.Do(req)
    if err != nil {
      errChan <- err
    }
    if resp != nil {
      defer resp.Body.Close()
      respData, err := ioutil.ReadAll(resp.Body)
      if err != nil {
        errChan <- err
      }
      respChan <- respData
    } else {
      errChan <- errors.New("HTTP request failed")
    }
  }()
  for {
    select {
    case <-ctx.Done():
      tr.CancelRequest(req)
      errChan <- errors.New("HTTP request cancelled")
      return
    case <-errChan:
      tr.CancelRequest(req)
      return
    }
  }
}

它使用了http.Client.Do,然后接收到ctx.Done的时候,通过调用transport.CancelRequest来进行结束。

们还可以参考net/dail/DialContext 换而言之,如果你希望你实现的包是“可中止/可控制”的,那么你在你包实现的函数里面,最好是能接收一个Context函数,并且处理了Context.Done。

场景二:PipeLine

pipeline模式就是流水线模型,流水线上的几个工人,有n个产品,一个一个产品进行组装。其实pipeline模型的实现和Context并无关系,没有context我们也能用chan实现pipeline模型。但是对于整条流水线的控制,则是需要使用上Context的。这篇文章Pipeline Patterns in Go例子是非常好的说明。这里就大致对这个代码进行下说明。

runSimplePipeline的流水线工人有三个,lineListSource负责将参数一个个分割进行传输,lineParser负责将字符串处理成int64,sink根据具体的值判断这个数据是否可用。他们所有的返回值基本上都有两个chan,一个用于传递数据,一个用于传递错误。(<-chan string, <-chan error)输入基本上也都有两个值,一个是Context,用于传声控制的,一个是(in <-chan)输入产品的。

我们可以看到,这三个工人的具体函数里面,都使用switch处理了case <-ctx.Done()。这个就是生产线上的命令控制。

func lineParser(ctx context.Context, base int, in <-chan string) (
    <-chan int64, <-chan error, error) {
    ...
    go func() {
        defer close(out)
        defer close(errc)

        for line := range in {

            n, err := strconv.ParseInt(line, base, 64)
            if err != nil {
                errc <- err
                return
            }

            select {
            case out <- n:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out, errc, nil
}

超时请求

我们发送RPC请求的时候,往往希望对这个请求进行一个超时的限制。当一个RPC请求超过10s的请求,自动断开。当然我们使用CancelContext,也能实现这个功能(开启一个新的goroutine,这个goroutine拿着cancel函数,当时间到了,就调用cancel函数)。

鉴于这个需求是非常常见的,context包也实现了这个需求:timerCtx。具体实例化的方法是 WithDeadline 和 WithTimeout。

具体的timerCtx里面的逻辑也就是通过time.AfterFunc来调用ctx.cancel的。

官方的例子:

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err()) // prints "context deadline exceeded"
    }
}

在http的客户端里面加上timeout也是一个常见的办法

uri := "https://httpbin.org/delay/3"
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
    log.Fatalf("http.NewRequest() failed with '%s'\n", err)
}

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req)
if err != nil {
    log.Fatalf("http.DefaultClient.Do() failed with:\n'%s'\n", err)
}
defer resp.Body.Close()

在http服务端设置一个timeout如何做呢?

package main

import (
    "net/http"
    "time"
)

func test(w http.ResponseWriter, r *http.Request) {
    time.Sleep(20 * time.Second)
    w.Write([]byte("test"))
}


func main() {
    http.HandleFunc("/", test)
    timeoutHandler := http.TimeoutHandler(http.DefaultServeMux, 5 * time.Second, "timeout")
    http.ListenAndServe(":8080", timeoutHandler)
}

我们看看TimeoutHandler的内部,本质上也是通过context.WithTimeout来做处理。

func (h *timeoutHandler) ServeHTTP(w ResponseWriter, r *Request) {
  ...
        ctx, cancelCtx = context.WithTimeout(r.Context(), h.dt)
        defer cancelCtx()
    ...
    go func() {
    ...
        h.handler.ServeHTTP(tw, r)
    }()
    select {
    ...
    case <-ctx.Done():
        ...
    }
}

场景四:HTTP服务器的request互相传递数据

context还提供了valueCtx的数据结构。

这个valueCtx最经常使用的场景就是在一个http服务器中,在request中传递一个特定值,比如有一个中间件,做cookie验证,然后把验证后的用户名存放在request中。

我们可以看到,官方的request里面是包含了Context的,并且提供了WithContext的方法进行context的替换。

package main

import (
    "net/http"
    "context"
)

type FooKey string

var UserName = FooKey("user-name")
var UserId = FooKey("user-id")

func foo(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        ctx := context.WithValue(r.Context(), UserId, "1")
        ctx2 := context.WithValue(ctx, UserName, "yejianfeng")
        next(w, r.WithContext(ctx2))
    }
}

func GetUserName(context context.Context) string {
    if ret, ok := context.Value(UserName).(string); ok {
        return ret
    }
    return ""
}

func GetUserId(context context.Context) string {
    if ret, ok := context.Value(UserId).(string); ok {
        return ret
    }
    return ""
}

func test(w http.ResponseWriter, r *http.Request) {
    w.Write([]byte("welcome: "))
    w.Write([]byte(GetUserId(r.Context())))
    w.Write([]byte(" "))
    w.Write([]byte(GetUserName(r.Context())))
}

func main() {
    http.Handle("/", foo(test))
    http.ListenAndServe(":8080", nil)
}

在使用ValueCtx的时候需要注意一点,这里的key不应该设置成为普通的String或者Int类型,为了防止不同的中间件对这个key的覆盖。最好的情况是每个中间件使用一个自定义的key类型,比如这里的FooKey,而且获取Value的逻辑尽量也抽取出来作为一个函数,放在这个middleware的同包中。这样,就会有效避免不同包设置相同的key的冲突问题了。