分析Go1.18 GMP调度器底层原理(入门)

Go 语言有强大的并发能力,能够简单的通过 go 关键字创建大量的轻量级协程 Goroutine,帮助程序快速执行各种任务,比Java等其他支持多线程的语言在并发方面更为强大,除了会用它,我们还需要掌握其底层原理,自己花时间把 GMP 调度器的底层源码学习一遍,才能对它有较为深刻的理解和掌握,本文是自己个人对于 Go语言 GMP 调度器(Go Scheduler)底层原理的学习笔记。

在学习 Go 语言的 GMP 调度器之前,原以为 GMP 底层原理较为复杂,要花很多时间和精力才能掌握,亲自下场学习之后,才发现其实并不复杂,只需三个多小时就足够:先花半个多小时,学习下刘丹冰Aceld 的 B 站讲解视频《Golang深入理解GPM模型》,然后花两个小时,结合《Go语言设计和实现》6.5节调度器的内容阅读下相关源码,最后,可以快速看看 GoLang-Scheduling In Go 前两篇文章的中译版,这样可以较快掌握 GMP 调度器的设计思想。

当然,如果希望理解的更加透彻,还需要仔细钻研几遍源码,并学习其他各种资料,尤其是 Go 开发者的文章,最好能够输出一篇文章,以加深头脑中神经元的连接和对事情本质的理解,本文就是这一学习思路的结果,希望能帮助到感兴趣的同学。

本文的代码基于 Go1.18.1 版本,跟 Go1.14 版本的调度器的主要逻辑相比,依然没有大的变化,目前看到的改动是调度循环的 runtime.findrunnable() 函数,抽取了多个逻辑封装成了新的方法,比如 M 从 其他 P 上偷取 G 的 runtime.stealWork()。

0. 结论

先给出整篇文章的结论和大纲,便于大家获取重点:

\1. 为了解决 Go 早期多线程 M 对应多协程 G 调度器的全局锁、中心化状态带来的锁竞争导致的性能下降等问题,Go 开发者引入了处理器 P 结构,形成了当前经典的 GMP 调度模型;

\2. Go 调度器是指:运行时在用户态提供的多个函数组成的一种机制,目的是高效地调度 G 到 M上去执行;

\3. Go 调度器的核心思想是:尽可能复用线程 M,避免频繁的线程创建和销毁;利用多核并行能力,限制同时运行(不包含阻塞)的 M 线程数 等于 CPU 的核心数目; Work Stealing 任务窃取机制,M 可以从其他 M 绑定的 P 的运行队列偷取 G 执行;Hand Off 交接机制,为了提高效率,M 阻塞时,会将 M 上 P 的运行队列交给其他 M 执行;基于协作的抢占机制,为了保证公平性和防止 Goroutine 饥饿问题,Go 程序会保证每个 G 运行 10ms 就让出 M,交给其他 G 去执行,这个 G 运行 10ms 就让出 M 的机制,是由单独的系统监控线程通过 retake() 函数给当前的 G 发送抢占信号实现的,如果所在的 P 没有陷入系统调用且没有满,让出的 G 优先进入本地 P 队列,否则进入全局队列;;基于信号的真抢占机制,Go1.14 引入了基于信号的抢占式调度机制,解决了 GC 垃圾回收和栈扫描时无法被抢占的问题;

\4. 由于数据局部性,新创建的 G 优先放入本地队列,在本地队列满了时,会将本地队列的一半 G 和新创建的 G 打乱顺序,一起放入全局队列;本地队列如果一直没有满,也不用担心,全局队列的 G 永远会有 1/61 的机会被获取到,调度循环中,优先从本地队列获取 G 执行,不过每隔61次,就会直接从全局队列获取,至于为啥是 61 次,Dmitry 的视频讲解了,就是要一个既不大又不小的数,而且不能跟其他的常见的2的幂次方的数如 64 或 48 重合;

\5. M 优先执行其所绑定的 P 的本地运行队列中的 G,如果本地队列没有 G,则会从全局队列获取,为了提高效率和负载均衡,会从全局队列获取多个 G,而不是只取一个,个数是自己应该从全局队列中承担的,globrunqsize / nprocs + 1;同样,当全局队列没有时,会从其他 M 的 P 上偷取 G 来运行,偷取的个数通常是其他 P 运行队列的一半;

\6. G 在运行时中的状态可以简化成三种:等待中_Gwaiting、可运行_Grunnable、运行中_Grunning,运行期间大部分情况是在这三种状态间来回切换;

\7. M 的状态可以简化为只有两种:自旋和非自旋;自旋状态,表示 M 绑定了 P 又没有获取 G;非自旋状态,表示正在执行 Go 代码中,或正在进入系统调用,或空闲;

\8. P 结构体中最重要的,是持有一个可运行 G 的长度为 256 的本地环形队列,可以通过 CAS 的方式无锁访问,跟需要加锁访问的全局队列 schedt.runq 相对应;

\9. 调度器的启动逻辑是:初始化 g0 和 m0,并将二者互相绑定, m0 是程序启动后的初始线程,g0 是 m0 线程的系统栈代表的 G 结构体,负责普通 G 在 M 上的调度切换 --> runtime.schedinit():负责M、P 的初始化过程,分别调用runtime.mcommoninit() 初始化 M 的全局队列allm 、调用 runtime.procresize() 初始化全局 P 队列 allp --> runtime.newproc():负责获取空闲的 G 或创建新的 G --> runtime.mstart() 启动调度循环;;

\10. 调度器的循环逻辑是:运行函数 schedule() --> 通过 runtime.globrunqget() 从全局队列、通过 runtime.runqget() 从 P 本地队列、 runtime.findrunnable 从各个地方,获取一个可执行的 G --> 调用 runtime.execute() 执行 G --> 调用 runtime.gogo() 在汇编代码层面上真正执行G --> 调用 runtime.goexit0() 执行 G 的清理工作,重新将 G 加入 P 的空闲队列 --> 调用 runtime.schedule() 进入下一次调度循环。

1. GMP调度模型的设计思想

1.1 传统多线程的问题

在现代的操作系统中,为了提高并发处理任务的能力,一个 CPU 核上通常会运行多个线程,多个线程的创建、切换使用、销毁开销通常较大:

1)一个内核线程的大小通常达到1M,因为需要分配内存来存放用户栈和内核栈的数据;

2)在一个线程执行系统调用(发生 IO 事件如网络请求或读写文件)不占用 CPU 时,需要及时让出 CPU,交给其他线程执行,这时会发生线程之间的切换;

3)线程在 CPU 上进行切换时,需要保持当前线程的上下文,将待执行的线程的上下文恢复到寄存器中,还需要向操作系统内核申请资源;

在高并发的情况下,大量线程的创建、使用、切换、销毁会占用大量的内存,并浪费较多的 CPU 时间在非工作任务的执行上,导致程序并发处理事务的能力降低。

图1.1 传统多线程之间的切换开销较大

1.2 Go语言早期引入的 GM 模型

为了解决传统内核级的线程的创建、切换、销毁开销较大的问题,Go 语言将线程分为了两种类型:内核级线程 M (Machine),轻量级的用户态的协程 Goroutine,至此,Go 语言调度器的三个核心概念出现了两个:

M: Machine的缩写,代表了内核线程 OS Thread,CPU调度的基本单元;

G: Goroutine的缩写,用户态、轻量级的协程,一个 G 代表了对一段需要被执行的 Go 语言程序的封装;每个 Goroutine 都有自己独立的栈存放自己程序的运行状态;分配的栈大小 2KB,可以按需扩缩容;

图1.2 Go将线程拆分为内核线程 M 和用户线程 G

在早期,Go 将传统线程拆分为了 M 和 G 之后,为了充分利用轻量级的 G 的低内存占用、低切换开销的优点,会在当前一个M上绑定多个 G,某个正在运行中的 G 执行完成后,Go 调度器会将该 G 切换走,将其他可以运行的 G 放入 M 上执行,这时一个 Go 程序中只有一个 M 线程:

图1.3 多个 G 对应一个 M

这个方案的优点是用户态的 G 可以快速切换,不会陷入内核态,缺点是每个 Go 程序都用不了硬件的多核加速能力,并且 G 阻塞会导致跟 G 绑定的 M 阻塞,其他 G 也用不了 M 去执行自己的程序了。

为了解决这些不足,Go 后来快速上线了多线程调度器:

图1.4 多个 M 对应多个 G

每个Go程序,都有多个 M 线程对应多个 G 协程,该方案有以下缺点:

1)全局锁、中心化状态带来的锁竞争导致的性能下降; 2)M 会频繁交接 G,导致额外开销、性能下降;每个 M 都得能执行任意的 runnable 状态的 G; 3)每个 M 都需要处理内存缓存,导致大量的内存占用并影响数据局部性; 4)系统调用频繁阻塞和解除阻塞正在运行的线程,增加了额外开销;

1.3 当前高效的 GMP 模型

为了解决多线程调度器的问题,Go 开发者 Dmitry Vyokov 在已有 G、M 的基础上,引入了 P 处理器,由此产生了当前 Go 中经典的 GMP 调度模型。

P:Processor的缩写,代表一个虚拟的处理器,它维护一个局部的可运行的 G 队列,可以通过 CAS 的方式无锁访问,工作线程 M 优先使用自己的局部运行队列中的 G,只有必要时才会去访问全局运行队列,这大大减少了锁冲突,提高了大量 G 的并发性。每个 G 要想真正运行起来,首先需要被分配一个 P。

如图 1.5 所示,是当前 Go 采用的 GMP 调度模型。可运行的 G 是通过处理器 P 和线程 M 绑定起来的,M 的执行是由操作系统调度器将 M 分配到 CPU 上实现的,Go 运行时调度器负责调度 G 到 M 上执行,主要在用户态运行,跟操作系统调度器在内核态运行相对应。

图1.5 当前高效的GMP调度模型

需要说明的是,Go 调度器也叫 Go 运行时调度器,或 Goroutine 调度器,指的是由运行时在用户态提供的多个函数组成的一种机制,目的是为了高效地调度 G 到 M上去执行。可以跟操作系统的调度器 OS Scheduler 对比来看,后者负责将 M 调度到 CPU 上运行。从操作系统层面来看,运行在用户态的 Go 程序只是一个请求和运行多个线程 M 的普通进程,操作系统不会直接跟上层的 G 打交道。

至于为什么不直接将本地队列放在 M 上、而是要放在 P 上呢? 这是因为当一个线程 M 阻塞(可能执行系统调用或 IO请求)的时候,可以将和它绑定的 P 上的 G 转移到其他线程 M 去执行,如果直接把可运行 G 组成的本地队列绑定到 M,则万一当前 M 阻塞,它拥有的 G 就不能给到其他 M 去执行了。

基于 GMP 模型的 Go 调度器的核心思想是:

\1. 尽可能复用线程 M:避免频繁的线程创建和销毁;

\2. 利用多核并行能力:限制同时运行(不包含阻塞)的 M 线程数为 N,N 等于 CPU 的核心数目,这里通过设置 P 处理器的个数为 GOMAXPROCS 来保证,GOMAXPROCS 一般为 CPU 核数,因为 M 和 P 是一一绑定的,没有找到 P 的 M 会放入空闲 M 列表,没有找到 M 的 P 也会放入空闲 P 列表;

\3. Work Stealing 任务窃取机制:M 优先执行其所绑定的 P 的本地队列的 G,如果本地队列为空,可以从全局队列获取 G 运行,也可以从其他 M 偷取 G 来运行;为了提高并发执行的效率,M 可以从其他 M 绑定的 P 的运行队列偷取 G 执行,这种 GMP 调度模型也叫任务窃取调度模型,这里,任务就是指 G;

\4. Hand Off 交接机制:M 阻塞,会将 M 上 P 的运行队列交给其他 M 执行,交接效率要高,才能提高 Go 程序整体的并发度;

\5. 基于协作的抢占机制:每个真正运行的G,如果不被打断,将会一直运行下去,为了保证公平,防止新创建的 G 一直获取不到 M 执行造成饥饿问题,Go 程序会保证每个 G 运行10ms 就要让出 M,交给其他 G 去执行;

\6. 基于信号的真抢占机制:尽管基于协作的抢占机制能够缓解长时间 GC 导致整个程序无法工作和大多数 Goroutine 饥饿问题,但是还是有部分情况下,Go调度器有无法被抢占的情况,例如,for 循环或者垃圾回收长时间占用线程,为了解决这些问题, Go1.14 引入了基于信号的抢占式调度机制,能够解决 GC 垃圾回收和栈扫描时存在的问题。

2. 多图详解几种常见的调度场景

在进入 GMP 调度模型的数据结构和源码之前,可以先用几张图形象的描述下 GMP 调度机制的一些场景,帮助理解 GMP 调度器为了保证公平性、可扩展性、及提高并发效率,所设计的一些机制和策略。

1)创建 G: 正在 M1 上运行的P,有一个G1,通过go func() 创建 G2 后,由于局部性,G2优先放入P的本地队列;

图2.1 正在M1上运行的G1通过go func() 创建 G2 后,由于局部性,G2优先放入P的本地队列

2)G 运行完成后:M1 上的 G1 运行完成后(调用goexit()函数),M1 上运行的 Goroutine 会切换为 G0,G0 负责调度协程的切换(运行schedule() 函数),从 M1 上 P 的本地运行队列获取 G2 去执行(函数execute());注意:这里 G0 是程序启动时的线程 M(也叫M0)的系统栈表示的 G 结构体,负责 M 上 G 的调度;

图2.2 M1 上的 G1 运行完会切换到 P 本地队列的 G2 运行

3)M 上创建的 G 个数大于本地队列长度时:如果 P 本地队列最多能存 4 个G(实际上是256个),正在 M1 上运行的 G2 要通过go func()创建 6 个G,那么,前 4 个G 放在 P 本地队列中,G2 创建了第 5 个 G(G7)时,P 本地队列中前一半和 G7 一起打乱顺序放入全局队列,P 本地队列剩下的 G 往前移动,G2 创建的第 6 个 G(G8)时,放入 P 本地队列中,因为还有空间;

图2.3 M1上的G2要创建的G个数多于P本地队列能够存放的G个数时

4)M 的自旋状态:创建新的 G 时,运行的 G 会尝试唤醒其他空闲的 M 绑定 P 去执行,如果 G2 唤醒了M2,M2 绑定了一个 P2,会先运行 M2 的 G0,这时 M2 没有从 P2 的本地队列中找到 G,会进入自旋状态(spinning),自旋状态的 M2 会尝试从全局空闲线程队列里面获取 G,放到 P2 本地队列去执行,获取的数量满足公式:n = min(len(globrunqsize)/GOMAXPROCS + 1, len(localrunsize/2)),含义是每个P应该从全局队列承担的 G 数量,为了提高效率,不能太多,要给其他 P 留点;

图2.4 创建新的 G 时,运行的G会尝试唤醒其他空闲的M绑定P去执行

5)任务窃取机制:自旋状态的 M 会寻找可运行的 G,如果全局队列为空,则会从其他 P 偷取 G 来执行,个数是其他 P 运行队列的一半;

图2.5 自旋状态的 M 会寻找可运行的 G,如果全局队列为空,则会从其他 P 偷取 G 来执行

6)G 发生系统调用时:如果 G 发生系统调度进入阻塞,其所在的 M 也会阻塞,因为会进入内核状态等待系统资源,和 M 绑定的 P 会寻找空闲的 M 执行,这是为了提高效率,不能让 P 本地队列的 G 因所在 M 进入阻塞状态而无法执行;需要说明的是,如果是 M 上的 G 进入 Channel 阻塞,则该 M 不会一起进入阻塞,因为 Channel 数据传输涉及内存拷贝,不涉及系统资源等待;

图2.6 如果 G 发生阻塞,其所在的 M 也会阻塞,和 M 绑定的 P 会寻找空闲的 M 执行

7)G 退出系统调用时:如果刚才进入系统调用的 G2 解除了阻塞,其所在的 M1 会寻找 P 去执行,优先找原来的 P,发现没有找到,则其上的 G2 会进入全局队列,等其他 M 获取执行,M1 进入空闲队列;

图 2.7 当 G 解除阻塞时,所在的 M会寻找 P 去执行,如果没有找到,则 G 进入全局队列,M 进入空闲队列

3. GMP的数据结构和各种状态

3.1 G 的数据结构和状态

G 的数据结构是:

// src/runtime/runtime2.go
type g struct {
 stack       stack       // 描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi)
 stackguard0 uintptr     // 用于调度器抢占式调度
 _panic      *_panic     // 最内侧的 panic 结构体
 _defer      *_defer     // 最内侧的 defer 延迟函数结构体
 m           *m          // 当前 G 占用的线程,可能为空
 sched       gobuf       //  存储 G 的调度相关的数据
 atomicstatus uint32     // G 的状态
 goid         int64      //  G 的 ID
 waitreason   waitReason //当状态status==Gwaiting时等待的原因
 preempt       bool      // 抢占信号
 preemptStop   bool      // 抢占时将状态修改成 `_Gpreempted`
 preemptShrink bool      // 在同步安全点收缩栈
 lockedm        muintptr   //G 被锁定只能在这个 m 上运行
 waiting        *sudog     // 这个 g 当前正在阻塞的 sudog 结构体
 ......
}

G 的主要字段有:

​ stack:描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi);

​ stackguard0: 可以用于调度器抢占式调度;preempt,preemptStop,preemptShrink跟抢占相关;

​ defer 和 panic:分别记录这个 G 最内侧的panic和 _defer结构体;

​ m:记录当前 G 占用的线程 M,可能为空;

​ atomicstatus:表示G 的状态;

​ sched:存储 G 的调度相关的数据;

​ goid:表示 G 的 ID,对开发者不可见;

需要展开描述的是sched 字段的 runtime.gobuf 结构体:

type gobuf struct {
 sp   uintptr      // 栈指针
 pc   uintptr      // 程序计数器,记录G要执行的下一条指令位置
 g    guintptr     // 持有 runtime.gobuf 的 G
 ret  uintptr      // 系统调用的返回值
 ......
}

这些字段会在调度器将当前 G 切换离开 M 和调度进入 M 执行程序时用到,栈指针 sp 和程序计数器 pc 用来存放或恢复寄存器中的值,改变程序执行的指令。

结构体 runtime.g 的 atomicstatus 字段存储了当前 G 的状态,G 可能处于以下状态:

const (
 // _Gidle 表示 G 刚刚被分配并且还没有被初始化
 _Gidle = iota // 0
 // _Grunnable 表示 G  没有执行代码,没有栈的所有权,存储在运行队列中
 _Grunnable // 1
 // _Grunning 可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P
 _Grunning // 2
 // _Gsyscall 正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上
 _Gsyscall // 3
 // _Gwaiting 由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上
 _Gwaiting // 4
 // _Gdead 没有被使用,没有执行代码,可能有分配的栈
 _Gdead // 6
 // _Gcopystack 栈正在被拷贝,没有执行代码,不在运行队列上
 _Gcopystack // 8
 // _Gpreempted 由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒
 _Gpreempted // 9
 // _Gscan GC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在
 _Gscan          = 0x1000
 ......
)

其中主要的六种状态是:

​ Gidle:G 被创建但还未完全被初始化;

​ Grunnable:当前 G 为可运行的,正在等待被运行;

​ Grunning:当前 G 正在被运行;

​ Gsyscall:当前 G 正在被系统调用;

​ Gwaiting:当前 G 正在因某个原因而等待;

​ Gdead:当前 G 完成了运行;

图3.1描述了G从创建到结束的生命周期中经历的各种状态变化过程:

图3.1 G的状态变化

虽然 G 在运行时中定义的状态较多且复杂,但是我们可以将这些不同的状态聚合成三种:等待中、可运行、运行中,分别由_Gwaiting、_Grunnable、_Grunning 三种状态表示,运行期间大部分情况是在这三种状态来回切换:

​ 等待中:G 正在等待某些条件满足,例如:系统调用结束等,包括 _Gwaiting、_Gsyscall 几个状态; ​ 可运行:G 已经准备就绪,可以在线程 M 上运行,如果当前程序中有非常多的 G,每个 G 就可能会等待更多的时间,即 _Grunnable; ​ 运行中:G 正在某个线程 M 上运行,即 _Grunning。

3.2 M 的数据结构

M 的数据结构是:

// src/runtime/runtime2.go
type m struct {
 g0            *g          // 持有调度栈的 G
 gsignal       *g                // 处理 signal 的 g
 tls           [tlsSlots]uintptr // 线程本地存储
        mstartfn      func()      // M的起始函数,go语句携带的那个函数
 curg          *g          // 在当前线程上运行的 G
 p             puintptr    // 执行 go 代码时持有的 p (如果没有执行则为 nil)
 nextp         puintptr    // 用于暂存与当前 M 有潜在关联的 P
 oldp          puintptr    // 执行系统调用前绑定的 P
 spinning      bool        // 表示当前 M 是否正在寻找 G,在寻找过程中 M 处于自旋状态
 lockedg       guintptr    // 表示与当前 M 锁定的那个 G
 .....
}

M 的字段众多,其中最重要的为下面几个:

​ g0: Go 运行时系统在启动之初创建的,用来调度其他 G 到 M 上;

​ mstartfn:表示M的起始函数,其实就是go 语句携带的那个函数;

​ curg:存放当前正在运行的 G 的指针;

​ p:指向当前与 M 关联的那个 P;

​ nextp:用于暂存于当前 M 有潜在关联的 P;

​ spinning:表示当前 M 是否正在寻找 G,在寻找过程中 M 处于自旋状态;

​ lockedg:表示与当前M锁定的那个 G,运行时系统会把 一个 M 和一个 G 锁定,一旦锁定就只能双方相互作用,不接受第三者;

M 并没有像 G 和 P 一样的状态标记, 但可以认为一个 M 有以下的状态:

​ 自旋中(spinning): M 正在从运行队列获取 G, 这时候 M 会拥有一个 P;

​ 执行go代码中: M 正在执行go代码, 这时候 M 会拥有一个 P;

​ 执行原生代码中: M 正在执行原生代码或者阻塞的syscall, 这时M并不拥有P;

​ 休眠中: M 发现无待运行的 G 时会进入休眠, 并添加到空闲 M 链表中, 这时 M 并不拥有 P。

3.3 P 的数据结构

P 的数据结构是:

// src/runtime/runtime2.go
type p struct {
 status      uint32      // p 的状态 pidle/prunning/...
 schedtick   uint32      // 每次执行调度器调度 +1
 syscalltick uint32      // 每次执行系统调用 +1
 m           muintptr    // 关联的 m 
 mcache      *mcache     // 用于 P 所在的线程 M 的内存分配的 mcache
 deferpool    []*_defer  // 本地 P 队列的 defer 结构体池
 // 可运行的 Goroutine 队列,可无锁访问
 runqhead uint32
 runqtail uint32
 runq     [256]guintptr
 // 线程下一个需要执行的 G
 runnext guintptr
 // 空闲的 G 队列,G 状态 status 为 _Gdead,可重新初始化使用
 gFree struct {
  gList
  n int32
 }
        ......
}

最主要的数据结构是 status 表示 P 的不同的状态,而 runqhead、runqtail 和 runq 三个字段表示处理器持有的运行队列,是一个长度为256的环形队列,其中存储着待执行的 G 列表,runnext 中是线程下一个需要执行的 G;gFree 存储 P 本地的状态为_Gdead 的空闲的 G,可重新初始化使用。

P 结构体中的状态 status 字段会是以下五种中的一种:

​ _Pidle:P 没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空;

​ _Prunning:被线程 M 持有,并且正在执行用户代码或者调度器;

​ _Psyscall:没有执行用户代码,当前线程陷入系统调用;

​ _Pgcstop:被线程 M 持有,当前处理器由于垃圾回收被停止;

​ _Pdead:当前 P 已经不被使用;

P 的五种状态之间的转化关系如图 3.2 所示:

图3.2 P的状态变化

3.4 schedt 的数据结构

调度器的schedt结构体存储了全局的 G 队列,空闲的 M 列表和 P 列表:

// src/runtime/runtime2.go
type schedt struct {
 lock mutex            // schedt的锁
 midle        muintptr // 空闲的M列表
 nmidle       int32    // 空闲的M列表的数量
 nmidlelocked int32    // 被锁定正在工作的M数
 mnext        int64    // 下一个被创建的 M 的 ID
 maxmcount    int32    // 能拥有的最大数量的 M
 pidle      puintptr   // 空闲的 P 链表
 npidle     uint32     // 空闲 P 数量
 nmspinning uint32     // 处于自旋状态的 M 的数量
 // 全局可执行的 G 列表
 runq     gQueue
 runqsize int32        // 全局可执行 G 列表的大小
 // 全局 _Gdead 状态的空闲 G 列表
 gFree struct {
  lock    mutex
  stack   gList // Gs with stacks
  noStack gList // Gs without stacks
  n       int32
 }
 // sudog结构的集中存储
 sudoglock  mutex
 sudogcache *sudog
 // 有效的 defer 结构池
 deferlock mutex
 deferpool *_defer
        ......
}

除了上面的四个结构体,还有一些全局变量:

// src/runtime/runtime2.go
var (
 allm       *m         // 所有的 M
 gomaxprocs int32      // P 的个数,默认为 ncpu 核数
 ncpu       int32
 ......
 sched      schedt     // schedt 全局结构体
 newprocs   int32

 allpLock mutex       // 全局 P 队列的锁
 allp []*p            // 全局 P 队列,个数为 gomaxprocs
        ......
}

此外,src/runtime/proc.go 文件有两个全局变量:

// src/runtime/proc.go 
var (
 m0           m       //  进程启动后的初始线程
 g0            g      //  代表着初始线程的stack
 ......
)

到这里,G、M、P、schedt结构体和全局变量都描述完毕,GMP 的全部队列如下表3-1所示:

表3-1 GMP的队列

中文名源码的名称作用域简要说明全局M列表runtime.allm运行时系统存放所有M全局P列表runtime.allp运行时系统存放所有P调度器中的空闲M列表runtime.schedt.midle调度器存放空闲M调度器中的空闲P列表runtime.schedt.pidle调度器存放空闲P调度器中的可运行G队列runtime.schedt.runq调度器存放可运行GP的本地可运行G队列runtime.p.runq本地P存放当前P中的可运行G调度器中的空闲G列表runtime.schedt.gfree调度器存放空闲的GP中的空闲G列表runtime.p.gfree本地P存放当前P中的空闲G

4. 调度器的启动

4.1 程序启动流程

Go 程序一启动,Go 的运行时 runtime 自带的调度器 scheduler 就开始启动了。

对于一个最简单的Go程序:

package main

import "fmt"

func main() {
 fmt.Println("hello world")
}

通过 gdb或dlv的方式调试,会发现程序的真正入口不是在 runtime.main,对 AMD64 架构上的 Linux 和 macOS 服务器来说,分别在runtime包的 src/runtime/rt0_linux_amd64.s 和 src/runtime/rt0_darwin_amd64.s:

TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
 JMP _rt0_amd64(SB)
TEXT _rt0_amd64_darwin(SB),NOSPLIT,$-8
 JMP _rt0_amd64(SB)

两者均跳转到了 src/runtime/asm_amd64.s 包的 _rt0_amd64 函数:

TEXT _rt0_amd64(SB),NOSPLIT,$-8
 MOVQ 0(SP), DI // argc
 LEAQ 8(SP), SI // argv
 JMP runtime·rt0_go(SB)

_rt0_amd64 函数调用了 src/runtime/asm_arm64.s 包的 runtime·rt0_go 函数:

TEXT runtime·rt0_go(SB),NOSPLIT|TOPFRAME,$0
 ......
 // 初始化g0
 MOVD $runtime·g0(SB), g
        ......
 // 初始化 m0
 MOVD $runtime·m0(SB), R0
// 绑定 g0 和 m0
 MOVD g, m_g0(R0)
 MOVD R0, g_m(g)
        ......
 BL runtime·schedinit(SB)      // 调度器初始化

 // 创建一个新的 goroutine 来启动程序
 MOVD $runtime·mainPC(SB), R0    // main函数入口 
 .......
        BL runtime·newproc(SB)        // 负责根据主函数即 main 的入口地址创建可被运行时调度的执行单元goroutine
 .......

 // 开始启动调度器的调度循环
 BL runtime·mstart(SB)
 ......

DATA runtime·mainPC+0(SB)/8,$runtime·main<ABIInternal>(SB)    // main函数入口地址
GLOBL runtime·mainPC(SB),RODATA,$8

Go程序的真正启动函数 runtime·rt0_go 主要做了几件事:

1)初始化 g0 和 m0,并将二者互相绑定, m0 是程序启动后的初始线程,g0 是 m0 的系统栈代表的 G 结构体,负责普通 G 在M 上的调度切换;

2)schedinit:进行各种运行时组件初始化工作,这包括我们的调度器与内存分配器、回收器的初始化;

3)newproc:负责根据主函数即 main 的入口地址创建可被运行时调度的执行单元;

4)mstart:开始启动调度器的调度循环;

阅读 Go 调度器的源码,需要先从整体结构上对其有个把握,Go 程序启动后的调度器主逻辑如图 4.1 所示:

图4.1 调度器主逻辑

下面分为两部分来分析调度器的原理:调度器的启动和调度循环。

4.2 调度器的启动

调度器启动函数在 src/runtime/proc.go 包的 schedinit() 函数:

// src/runtime/proc.go
// 调度器初始化
func schedinit() {
 ......
 _g_ := getg()   
 ......
        // 设置机器线程数M最大为10000
 sched.maxmcount = 10000
        ......
 // 栈、内存分配器相关初始化
 stackinit()          // 初始化栈
 mallocinit()         // 初始化内存分配器
 ......
 // 初始化当前系统线程 M0
 mcommoninit(_g_.m, -1)
 ......
        // GC初始化
 gcinit()
        ......
        // 设置P的值为GOMAXPROCS个数
 procs := ncpu
 if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
  procs = n
 }
        // 调用procresize调整 P 列表
 if procresize(procs) != nil {
  throw("unknown runnable goroutine during bootstrap")
 }
 ......
}

schedinit() 函数会设置 M 最大数量为10000,实际中不会达到;会分别调用stackinit() 、mallocinit() 、mcommoninit() 、gcinit() 等执行 goroutine栈初始化、进行内存分配器初始化、进行系统线程M0的初始化、进行GC垃圾回收器的初始化;接着,将 P 个数设置为 GOMAXPROCS 的值,即程序能够同时运行的最大处理器数,最后会调用 runtime.procresize()函数初始化 P 列表。

图4.2 runtime.schedinit() 函数逻辑

schedinit() 函数负责M、P、G 的初始化过程。M/P/G 彼此的初始化顺序遵循:mcommoninit、procresize、newproc,他们分别负责初始化 M 资源池(allm)、P 资源池(allp)、G 的运行现场(g.sched)以及调度队列(p.runq)。

mcommoninit() 函数主要负责对 M0 进行一个初步的初始化,并将其添加到 schedt 全局结构体中,这里访问 schedt 会加锁:

// src/runtime/proc.go
func mcommoninit(mp *m, id int64) {
        ......
 lock(&sched.lock)

 if id >= 0 {
  mp.id = id
 } else { // mReserveID() 会返回 sched.mnext 给当前 m,并对 sched.mnext++,记录新增加的这个 M 到 schedt 全局结构体
  mp.id = mReserveID()
 }

 ......

 // 添加到 allm 中
 mp.alllink = allm

 // 等价于 allm = mp
 atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
 unlock(&sched.lock)
        ......
}

runtime.procresize()函数的逻辑是:

// src/runtime/proc.go
func procresize(nprocs int32) *p {
 ......
        // 获取先前的 P 个数
 old := gomaxprocs
 ......
 // 如果全局变量 allp 切片中的处理器数量少于期望数量,对 allp 扩容
 if nprocs > int32(len(allp)) {
  // 加锁
  lock(&allpLock)
  if nprocs <= int32(cap(allp)) { // 如果要达到的 P 个数 nprocs 小于当前全局 P 切片到容量
   allp = allp[:nprocs]    // 在当前全局 P 切片上截取前 nprocs 个 P
  } else {
                        // 否则,调大了,超出全局 P 切片的容量,创建容量为 nprocs 的新的 P 切片
   nallp := make([]*p, nprocs)
   // 将原有的 p 复制到新创建的 nallp 中
   copy(nallp, allp[:cap(allp)])
   allp = nallp  // 新的 nallp 切片赋值给旧的 allp
  }
                ......
  unlock(&allpLock)
 }

 // 使用 new 创建新的 P 结构体并调用 runtime.p.init 初始化刚刚扩容的allp列表里的 P
 for i := old; i < nprocs; i++ {
  pp := allp[i]
                // 如果 p 是新创建的(新创建的 p 在数组中为 nil),则申请新的 P 对象
  if pp == nil {
   pp = new(p)
  }
  pp.init(i)
  atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
 }

 _g_ := getg()
        // 当前 G 的 M 上的 P 不为空,并且其 id 小于 nprocs,说明 ID 有效,则可以继续使用当前 G 的 P
 if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
  // 继续使用当前 P, 其状态设置为 _Prunning
  _g_.m.p.ptr().status = _Prunning
  _g_.m.p.ptr().mcache.prepareForSweep()
 } else {
  // 否则,释放当前 P 并获取 allp[0]
  if _g_.m.p != 0 {
   ......
   _g_.m.p.ptr().m = 0
  }
  _g_.m.p = 0
                // 将处理器 allp[0] 绑定到当前 M
  p := allp[0]
  p.m = 0
  p.status = _Pidle  // P 状态设置为 _Pidle 
  acquirep(p)        // 将allp[0]绑定到当前的 M
  if trace.enabled {
   traceGoStart()
  }
 }

 
 mcache0 = nil

 // 调用 runtime.p.destroy 释放从未使用的 P
 for i := nprocs; i < old; i++ {
  p := allp[i]
  p.destroy()
  // 不能释放 p 本身,因为他可能在 m 进入系统调用时被引用
 }

 // 裁剪 allp,保证allp长度与期望处理器数量相等
 if int32(len(allp)) != nprocs {
  lock(&allpLock)
  allp = allp[:nprocs]
  idlepMask = idlepMask[:maskWords]
  timerpMask = timerpMask[:maskWords]
  unlock(&allpLock)
 }

 var runnablePs *p
        // 将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局的空闲队列中
 for i := nprocs - 1; i >= 0; i-- {
  p := allp[i]
  if _g_.m.p.ptr() == p {    // 跳过当前 P
   continue
  }
  p.status = _Pidle          // 设置 P 的状态为空闲状态
  if runqempty(p) {
   pidleput(p)        // 放入到全局结构体 schedt 的空闲 P 列表中
  } else {
   p.m.set(mget())    // 如果有本地任务,则为其绑定一个 M
   p.link.set(runnablePs)
   runnablePs = p
  }
 }
 stealOrder.reset(uint32(nprocs))
 var int32p *int32 = &gomaxprocs 
 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
 return runnablePs      // 返回所有包含本地任务的 P 链表
}

runtime.procresize() 函数 的执行过程如下:

1)如果全局变量 allp 切片中的 P 数量少于期望数量,会对切片进行扩容;

2)使用 new 创建新的 P 结构体并调用 runtime.p.init 初始化刚刚扩容的 P;

3)通过指针将线程 m0 和处理器 allp[0] 绑定到一起;

4)调用 runtime.p.destroy 释放不再使用的 P 结构;

5)通过切片截断改变全局变量 allp 的长度,保证它与期望 P 数量相等;

6)将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局 schedt 的空闲 P 队列中;

runtime.procresize() 函数的逻辑如图 4.3 所示:

图4.3 runtime.procresize() 函数逻辑

runtime.procresize() 函数调用 runtime.p.init 初始化新创建的 P:

// src/runtime/proc.go
// 初始化 P
func (pp *p) init(id int32) {
 pp.id = id             // p 的 id 就是它在 allp 中的索引
 pp.status = _Pgcstop   // 新创建的 p 处于 _Pgcstop 状态
 ......
        // 为 P 分配 cache 对象,涉及对象分配
 if pp.mcache == nil {
  if id == 0 {
   if mcache0 == nil {
    throw("missing mcache?")
   }
   pp.mcache = mcache0
  } else {
   pp.mcache = allocmcache()
  }
 }
 ......
}

需要说明的是,mcache内存结构原来是在 M 上的,自从引入了 P 之后,就将该结构体移到了P上,这样,就不用每个 M 维护自己的内存分配 mcache,由于 P 在有 M 可以执行时才会移动到其他 M 上去,空闲的 M 无须分配内存,这种灵活性使整体线程的内存分配大大减少。

4.3 怎样创建 G ?

我们再回到 4.1 节对程序启动函数 runtime·rt0_go,有个动作是通过 runtime.newproc 函数创建 G,runtime.newproc 入参是 funcval 结构体函数,代表 go 关键字后面调用的函数:

// src/runtime/proc.go
// 创建G,并放入 P 的运行队列
func newproc(fn *funcval) {
 gp := getg()
 pc := getcallerpc()    // 获取调用方 PC 寄存器值,即调用方程序要执行的下一个指令地址
        // 用 g0 系统栈创建 Goroutine 对象
        // 传递的参数包括 fn 函数入口地址, gp(g0),调用方 pc
 systemstack(func() {
  newg := newproc1(fn, gp, pc)  // 调用 newproc1 获取 Goroutine 结构

  _p_ := getg().m.p.ptr()       // 获取当前 G 的 P 
  runqput(_p_, newg, true)      // 将新的 G 放入 P 的本地运行队列

  if mainStarted {              // M 启动时唤醒新的 P 执行 G
   wakep()
  }
 })
}

runtime.newproce函数主要是调用 runtime.newproc1 获取新的 Goroutine 结构,将新的 G 放入P的运行队列,M 启动时唤醒新的 P 执行 G。

runtime.newproce函数的逻辑如图4.4所示:

图4.4 runtime.newproce() 函数逻辑

runtime.newproce1() 函数的逻辑是:

// src/runtime/proc.go
// 创建一个运行fn函数的goroutine
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
 _g_ := getg()       // 因为是在系统栈运行所以此时的 g 为 g0

 if fn == nil {
  _g_.m.throwing = -1 // do not dump full stacks
  throw("go of nil func value")
 }
 acquirem() // 加锁,禁止这时 G 的 M 被抢占,因为它可以在一个局部变量中保存 P

 _p_ := _g_.m.p.ptr()         // 获取 P
 newg := gfget(_p_)           // 从 P 的空闲列表获取一个空闲的 G
 if newg == nil {             // 找不到则创建
  newg = malg(_StackMin)     // 创建一个栈大小为 2K 的 G
  casgstatus(newg, _Gidle, _Gdead)     // CAS 改变 G 的状态为_Gdead
  allgadd(newg) // 将 _Gdead 状态的 g 添加到 allg,这样 GC 不会扫描未初始化的栈
 }
 ......
        // 计算运行空间大小,对齐
 totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) 
 totalSize = alignUp(totalSize, sys.StackAlign)
 sp := newg.stack.hi - totalSize       // 确定 SP 和参数入栈位置
 spArg := sp
 ......
        // 清理、创建并初始化 G 的运行现场
 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
 newg.sched.sp = sp
 newg.stktopsp = sp
// 保存goexit的地址到sched.pc
 newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
 newg.sched.g = guintptr(unsafe.Pointer(newg))
 gostartcallfn(&newg.sched, fn)
        // 初始化 G 的基本状态
 newg.gopc = callerpc
 newg.ancestors = saveAncestors(callergp)
 newg.startpc = fn.fn
 ......
        // 将 G 的状态设置为_Grunnable
 casgstatus(newg, _Gdead, _Grunnable)
 ......
        // 生成唯一的goid
 newg.goid = int64(_p_.goidcache)
 _p_.goidcache++
 ......
        // 释放对 M 加的锁
 releasem(_g_.m)

 return newg
}

runtime.newproc1() 函数主要执行三个动作: 1)获取或者创建新的 Goroutine 结构体,会先从处理器的 gFree 列表中查找空闲的 Goroutine,如果不存在空闲的 Goroutine,会通过 runtime.malg 创建一个栈大小足够的新结构体,新创建的 G 的状态为_Gdead; 2)将传入的参数 callergp,callerpc,fn更新到 G 的栈上,初始化 G 的相关参数; 3)将 G 状态设置为 _Grunnable 状态,返回;

runtime.newproc1() 函数的逻辑如图 4.5 所示:

图4.5 runtime.newproc1() 函数逻辑

runtime.newproc1() 函数主要调用 runtime.gfget() 函数 获取 G:

// src/runtime/proc.go
func gfget(_p_ *p) *g {
retry:
        // 如果 P 的空闲列表 gFree 为空,sched 的的空闲列表 gFree 不为空
 if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
  lock(&sched.gFree.lock)
  // 从 sched 的 gFree 列表中移动 32 个 G 到 P 的 gFree 中
  for _p_.gFree.n < 32 {
   gp := sched.gFree.stack.pop()
   if gp == nil {
    gp = sched.gFree.noStack.pop()
    if gp == nil {
     break
    }
   }
   sched.gFree.n--
   _p_.gFree.push(gp)
   _p_.gFree.n++
  }
  unlock(&sched.gFree.lock)
  goto retry
 }
        // 如果此时 P 的空闲列表还是为空,返回nil,说明无空闲的G
 gp := _p_.gFree.pop()
 if gp == nil {
  return nil
 }
 _p_.gFree.n--
        // 设置 G 的栈空间
 if gp.stack.lo == 0 {
  systemstack(func() {
   gp.stack = stackalloc(_FixedStack)
  })
  gp.stackguard0 = gp.stack.lo + _StackGuard
 } else {
  .....
 }
 return gp   // 从 P 的空闲列表获取 G 返回
}

runtime.gfget() 函数的主要逻辑是:当 P 的空闲列表 gFree 为空时,从 sched 持有的全局空闲列表 gFree 中移动最多 32个 G 到当前的 P 的空闲列表上;然后从 P 的 gFree 列表头返回一个 G;如果还是没有,则返回空,说明获取不到空闲的 G。

在 runtime.newproc1() 函数中,如果不存在空闲的 G,会通过 runtime.malg() 创建一个栈大小足够的新结构体:

// src/runtime/proc.go
// 创建一个新的 g 结构体 
func malg(stacksize int32) *g {
 newg := new(g)
 if stacksize >= 0 {     // 如果申请的堆栈大小大于 0,会通过 runtime.stackalloc 分配 2KB 的栈空间
  stacksize = round2(_StackSystem + stacksize)
  systemstack(func() {
   newg.stack = stackalloc(uint32(stacksize))
  })
  newg.stackguard0 = newg.stack.lo + _StackGuard
  newg.stackguard1 = ^uintptr(0)
  *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
 }
 return newg
}

回到 runtime.newproce函数,在获取到 G 后,会调用 runtime.runqput() 函数将 G 放入 P 本地队列,或全局队列:

// src/runtime/proc.go
// 将 G 放入 P 的运行队列中
func runqput(_p_ *p, gp *g, next bool) {
        // 保持一定的随机性,不将当前 G 设置为 P 的下一个执行的任务
 if randomizeScheduler && next && fastrandn(2) == 0 {
  next = false
 }

 if next {
 retryNext:
               // 将 G 放入到 P 的 runnext 变量中,作为下一个 P 执行的任务
  oldnext := _p_.runnext
  if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
   goto retryNext
  }
  if oldnext == 0 {
   return
  }
  // 获取原来的 runnext 存储的 G,放入 P 本地运行队列,或全局队列
  gp = oldnext.ptr()
 }

retry:
 h := atomic.LoadAcq(&_p_.runqhead) // 获取 P 环形队列的头和尾部指针
 t := _p_.runqtail
        // P 本地环形队列没有满,将 G 放入本地环形队列
 if t-h < uint32(len(_p_.runq)) {
  _p_.runq[t%uint32(len(_p_.runq))].set(gp)
  atomic.StoreRel(&_p_.runqtail, t+1) 
  return
 }
        // P 本地环形队列已满,将 G 放入全局队列
 if runqputslow(_p_, gp, h, t) {
  return
 }
 // 本地队列和全局队列没有满,则不会走到这里,否则循环尝试放入
 goto retry
}

runtime.runqput() 函数的主要处理逻辑是:

1)保留一定的随机性,设置 next 为 false,即不将当前 G 设置为 P 的下一个执行的 G;

2)当 next 为 true 时,将 G 设置到 P 的 runnext 作为 P 下一个执行的任务;

3)当 next 为 false 并且本地运行队列还有剩余空间时,将 Goroutine 加入处理器持有的本地运行队列;

4)当处理器的本地运行队列已经没有剩余空间时,就会把本地队列中的一部分 G 和待加入的 G 通过 runtime.runqputslow 添加到调度器持有的全局运行队列上;

runtime.runqput() 函数的逻辑如图 4.6 所示:

图4.6 runtime.runqput() 函数的逻辑

runtime.runqputslow() 函数的逻辑如下:

// 将 G 和 P 本地队列的一部分放入全局队列
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
 var batch [len(_p_.runq)/2 + 1]*g   // 初始化一个本地队列长度一半 + 1 的 G 列表 batch

 // 首先,从 P 本地队列中获取一部分 G 放入初始化的列表 batch
 n := t - h
 n = n / 2
 if n != uint32(len(_p_.runq)/2) {
  throw("runqputslow: queue is not full")
 }
 for i := uint32(0); i < n; i++ { // 将 P 本地环形队列的前一半 G 放入batch
  batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
 }
 if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
  return false
 }
 batch[n] = gp    // 将传入的 G 放入列表 batch 的尾部

 if randomizeScheduler {     // 打乱 batch 列表中G的顺序
  for i := uint32(1); i <= n; i++ {
   j := fastrandn(i + 1)
   batch[i], batch[j] = batch[j], batch[i]
  }
 }

 // 将 batch列表的 G 串成一个链表.
 for i := uint32(0); i < n; i++ {
  batch[i].schedlink.set(batch[i+1])
 }
 var q gQueue     // 将 batch 列表设置成 gQueue 队列
 q.head.set(batch[0])
 q.tail.set(batch[n])

 // 现在把 gQueue 队列放入全局队列
 lock(&sched.lock)
 globrunqputbatch(&q, int32(n+1))
 unlock(&sched.lock)
 return true
}

runtime.runqputslow() 函数会把 P 本地环形队列的前一半 G 获取出来,跟传入的 G 组成一个列表,打乱顺序,再放入全局队列。

综上所属,用下图表示调度器启动流程:

图4.7 调度器启动流程

5. 调度循环

我们再回到5.1节的程序启动流程,runtime·rt0_go 函数在调用 runtime.schedinit() 初始化好了调度器、调用 runtime.newproc()创建了main函数的 G 后,会调用runtime.mstart() 函数启动 M 去执行G。

TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME,$0
 CALL runtime·mstart0(SB)
 RET // not reached

runtime.mstart() 是用汇编写的,会直接调用 runtime.mstart0() 函数:

// src/runtime/proc.go
func mstart0() {
 _g_ := getg()
 ......
        // 初始化 g0 的参数
 _g_.stackguard0 = _g_.stack.lo + _StackGuard
 _g_.stackguard1 = _g_.stackguard0
 mstart1()

 ......
 mexit(osStack)
}

runtime.mstart0() 函数主要调用 runtime.mstart1():

// src/runtime/proc.go
func mstart1() {
 _g_ := getg()

 if _g_ != _g_.m.g0 {
  throw("bad runtime·mstart")
 }

 // 记录当前栈帧,便于其他调用复用,当进入 schedule 之后,再也不会回到 mstart1
 _g_.sched.g = guintptr(unsafe.Pointer(_g_))
 _g_.sched.pc = getcallerpc()
 _g_.sched.sp = getcallersp()

 asminit()
 minit()

 // 设置信号 handler;在 minit 之后,因为 minit 可以准备处理信号的的线程
 if _g_.m == &m0 {
  mstartm0()
 }
        // 执行启动函数
 if fn := _g_.m.mstartfn; fn != nil {
  fn()
 }
        // 如果当前 m 并非 m0,则要求绑定 p
 if _g_.m != &m0 {
  acquirep(_g_.m.nextp.ptr())
  _g_.m.nextp = 0
 }
        // 准备好后,开始调度循环,永不返回
 schedule()
}

runtime.mstart1() 保存调度信息后,会调用 runtime.schedule() 进入调度循环,寻找一个可执行的 G 并执行。

循环调度主逻辑如图5.1所示:

图5.1 循环调度主逻辑

runtime.schedule() 函数的逻辑是:

// src/runtime/proc.go
func schedule() {
 _g_ := getg()

 if _g_.m.locks != 0 {
  throw("schedule: holding locks")
 }
 ......
top:
 pp := _g_.m.p.ptr()
 pp.preempt = false

 if sched.gcwaiting != 0 {    // 如果需要 GC,不再进行调度
  gcstopm()
  goto top
 }
 if pp.runSafePointFn != 0 {  // 不等于0,说明在安全点
  runSafePointFn()
 }

 // 如果 G 所在的 M 在自旋,说明其P运行队列为空,如果不为空,则应该甩出错误
 if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
  throw("schedule: spinning with local work")
 }
        // 运行 P 上准备就绪的 Timer
 checkTimers(pp, 0)

 var gp *g
 var inheritTime bool
        ......
 if gp == nil {    // 说明不在 GC
  // 每调度 61 次,就检查一次全局队列,保证公平性;否则两个 Goroutine 可以通过互换,一直占领本地的 runqueue
  if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
   lock(&sched.lock)
   gp = globrunqget(_g_.m.p.ptr(), 1)     // 从全局队列中偷 g
   unlock(&sched.lock)
  }
 }
 if gp == nil {
                // 从 P 的本地队列获取 G
  gp, inheritTime = runqget(_g_.m.p.ptr())
 }
 if gp == nil {
  gp, inheritTime = findrunnable() // 阻塞式查找可用 G
 }

 // M 这时候一定是获取到了G
 // 如果 M 是自旋状态,重置其状态到非自旋
 if _g_.m.spinning {
  resetspinning()
 }
        .......
 // 执行 G
 execute(gp, inheritTime)
}

runtime.schedule() 函数会从下面几个地方查找待执行的 Goroutine:

1)为了保证公平,当全局运行队列中有待执行的 G 时,通过 schedtick 对 61 取模,表示每 61 次会有一次从全局的运行队列中查找对应的 G,这样可以避免两个 G 在 P 本地队列互换一直占有本地队列; 2)调用 runtime.runqget() 函数从 P 本地的运行队列中获取待执行的 G; 3)如果前两种方法都没有找到 G,会通过 runtime.findrunnable() 进行阻塞地查找 G;

runtime.schedule 函数从全局队列获取 G 的函数是 runtime.globrunqget() 函数:

// 从全局队列获取 G
func globrunqget(_p_ *p, max int32) *g {
 assertLockHeld(&sched.lock)
        // 如果全局队列没有 G,则直接返回
 if sched.runqsize == 0 {
  return nil
 }
        // 计算n,表示从全局队列放入本地队列的 G 的个数
 n := sched.runqsize/gomaxprocs + 1
 if n > sched.runqsize {
  n = sched.runqsize
 }
        // n 不能超过取的要获取的max个数 
 if max > 0 && n > max {
  n = max
 }
        // 计算能不能用本地队列的一般放下 n 个 G,如果放不下,则 n 设为本地队列的一半
 if n > int32(len(_p_.runq))/2 {
  n = int32(len(_p_.runq)) / 2
 }

 sched.runqsize -= n
        // 拿到全局队列的队头作为返回的 G
 gp := sched.runq.pop()   
 n--   // n计数减 1
        // 继续取剩下的 n-1个全局队列 G 放入本地队列
 for ; n > 0; n-- {
  gp1 := sched.runq.pop()
  runqput(_p_, gp1, false)
 }
 return gp
}

runtime.globrunqget() 函数会从全局队列获取 n 个 G,第一个 G 返回给调度器去执行,剩下的 n-1 个 G 放入本地队列,其中,n一般为全局队列长度 / P处理器个数 + 1,含义是平均每个 P 应该从全局队列中承担的 G 数量,且不能超过 P 本地长度的一半。

runtime.schedule() 函数调用 runtime.runqget() 函数从 P 本地的运行队列中获取待执行的 G:

// 从 P 本地队列中获取 G
func runqget(_p_ *p) (gp *g, inheritTime bool) {
 // 如果 P 有一个 runnext,则它就是下一个要执行的 G.
 next := _p_.runnext
 // 如果 runnext 不为空,而 CAS 失败, 则它又可能被其他 P 偷取了,
 // 因为其他 P 可以竞争机会到设置 runnext 为 0, 当前 P 只能设置该字段为非0
 if next != 0 && _p_.runnext.cas(next, 0) {
  return next.ptr(), true
 }

 for {
  h := atomic.LoadAcq(&_p_.runqhead) //从本地环形队列头遍历
  t := _p_.runqtail
  if t == h {    // 头尾指针相等,表示本地队列为空
   return nil, false
  }
                // 获取头部指针指向的 G
  gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
  if atomic.CasRel(&_p_.runqhead, h, h+1) { 
   return gp, false
  }
 }
}

本地队列的获取会先从 P 的 runnext 字段中获取,如果不为空则直接返回。如果 runnext 为空,那么从本地环形队列头指针遍历本地队列,取到了则返回。

阻塞式获取 G 的 runtime.findrunnable() 函数的整个逻辑看起来比较繁琐,其实无非是按这个顺序获取 G: local -> global -> netpoll -> steal -> local -> global -> netpoll:

// 找到一个可运行的 G 去执行
// 会从其他 P 的运行队列偷取,从本地会全局队列获取,或从网络轮询器获取
func findrunnable() (gp *g, inheritTime bool) {
 _g_ := getg()

top:
 _p_ := _g_.m.p.ptr()
 if sched.gcwaiting != 0 {     // 如果在 gc,则休眠当前 m,直到复始后回到 top
  gcstopm()
  goto top
 }
 if _p_.runSafePointFn != 0 {  // 不等于0,说明在安全点
  runSafePointFn()
 }

 now, pollUntil, _ := checkTimers(_p_, 0)
 

 // 取本地队列 local runq,如果已经拿到,立刻返回
 if gp, inheritTime := runqget(_p_); gp != nil {
  return gp, inheritTime
 }

 // 全局队列 global runq,如果已经拿到,立刻返回
 if sched.runqsize != 0 {
  lock(&sched.lock)
  gp := globrunqget(_p_, 0)
  unlock(&sched.lock)
  if gp != nil {
   return gp, false
  }
 }

 // 从 netpoll 网络轮询器中尝试获取 G,优先级比从其他 P 偷取 G 要高
 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
  if list := netpoll(0); !list.empty() { // non-blocking
   gp := list.pop()
   injectglist(&list)
   casgstatus(gp, _Gwaiting, _Grunnable)
   if trace.enabled {
    traceGoUnpark(gp, 0)
   }
   return gp, false
  }
 }

 // 自旋 M: 从其他 P 中窃取任务 G
 
 // 限制自旋 M 数量到忙碌P数量的一半. 避免一半P数量、并行机制很慢时的CPU消耗
 procs := uint32(gomaxprocs)
 if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
  if !_g_.m.spinning {
   _g_.m.spinning = true
   atomic.Xadd(&sched.nmspinning, 1)
  }
                // 从其他 P 或 timer 中偷取G
  gp, inheritTime, tnow, w, newWork := stealWork(now)
  now = tnow
  if gp != nil {
   // Successfully stole.
   return gp, inheritTime
  }
  if newWork {
   // 可能有新的 timer 或 GC,重新开始
   goto top
  }
  if w != 0 && (pollUntil == 0 || w < pollUntil) {
   // Earlier timer to wait for.
   pollUntil = w
  }
 }

 // 没有任何 work 可做。
        // 如果我们在 GC mark 阶段,则可以安全的扫描并标记对象为黑色
        // 然后便有 work 可做,运行 idle-time 标记而非直接放弃当前的 P。
 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
  node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
  if node != nil {
   _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
   gp := node.gp.ptr()
   casgstatus(gp, _Gwaiting, _Grunnable)
   if trace.enabled {
    traceGoUnpark(gp, 0)
   }
   return gp, false
  }
 }
        .....
 // 放弃当前的 P 之前,对 allp 做一个快照
 allpSnapshot := allp
 idlepMaskSnapshot := idlepMask
 timerpMaskSnapshot := timerpMask

 // 准备归还 p,对调度器加锁
 lock(&sched.lock)
        // 进入了 gc,回到顶部并停止 m
 if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
  unlock(&sched.lock)
  goto top
 }
        // 全局队列中又发现了任务
 if sched.runqsize != 0 {
  gp := globrunqget(_p_, 0)     // 赶紧偷掉返回
  unlock(&sched.lock)
  return gp, false
 }
 if releasep() != _p_ {         // 归还当前的 p
  throw("findrunnable: wrong p")
 }
 pidleput(_p_)       // 将 p 放入 idle 链表
 unlock(&sched.lock)      // 完成归还,解锁

 // 这里要非常小心: 线程从自旋到非自旋状态的转换,可能与新 Goroutine 的提交同时发生
 wasSpinning := _g_.m.spinning
 if _g_.m.spinning {
  _g_.m.spinning = false    // M 即将睡眠,状态不再是 spinning
  if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
   throw("findrunnable: negative nmspinning")
  }

  // 再次检查所有的 runqueue
  _p_ = checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
  if _p_ != nil {
   acquirep(_p_)
   _g_.m.spinning = true
   atomic.Xadd(&sched.nmspinning, 1)
   goto top
  }

  // 再次检查 idle-priority GC work,和上面重新找 runqueue 的逻辑类似
  _p_, gp = checkIdleGCNoP()
  if _p_ != nil {
   acquirep(_p_)
   _g_.m.spinning = true
   atomic.Xadd(&sched.nmspinning, 1)

   // Run the idle worker.
   _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
   casgstatus(gp, _Gwaiting, _Grunnable)
   if trace.enabled {
    traceGoUnpark(gp, 0)
   }
   return gp, false
  }

  // 最后, 检查 timer creation
  pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)
 }

 // 再次检查 netpoll 网络轮询器,和上面重新找 runqueue 的逻辑类似
 if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
  ......
  lock(&sched.lock)
  _p_ = pidleget()
  unlock(&sched.lock)
  if _p_ == nil {
   injectglist(&list)
  } else {
   acquirep(_p_)
   if !list.empty() {
    gp := list.pop()
    injectglist(&list)
    casgstatus(gp, _Gwaiting, _Grunnable)
    if trace.enabled {
     traceGoUnpark(gp, 0)
    }
    return gp, false
   }
   if wasSpinning {
    _g_.m.spinning = true
    atomic.Xadd(&sched.nmspinning, 1)
   }
   goto top
  }
 } else if pollUntil != 0 && netpollinited() {
  pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
  if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
   netpollBreak()
  }
 }
 stopm()     // 真的什么都没找到,暂止当前的 m
 goto top
}

runtime.findrunnable 函数的主要工作是:

1)首先检查是是否正在进行 GC,如果是则休眠当前的 M ; 2)尝试从本地队列中取 G,如果取到,则直接返回,否则继续从全局队列中找 G,如果找到则直接返回; 3)检查 netpoll 网络轮询器中是否有 G,如果有,则直接返回; 4)如果此时仍然无法找到 G,则从其他 P 的本地队列中偷取;从其他 P 本地队列偷取的工作会执行四轮,如果找到 G,则直接返回; 5)所有的可能性都尝试过了,在准备休眠 M 之前,还要进行额外的检查; 6)首先检查此时是否是 GC mark 阶段,如果是,则直接返回 mark 阶段的 G; 7)如果仍然没有,则对当前的 P 进行快照,准备对调度器进行加锁; 8)当调度器被锁住后,仍然还需再次检查这段时间里是否有进入 GC,如果已经进入了 GC,则回到第一步,阻塞 M 并休眠; 9)如果又在全局队列中发现了 G,则直接返回; 10)当调度器被锁住后,我们彻底找不到任务了,则归还释放当前的 P,将其放入 idle 链表中,并解锁调度器; 11)当 M、P 已经解绑后,我们需要将 M 的状态切换出自旋状态,并减少 nmspinning; 12)此时仍然需要重新检查所有的队列,如果我们又在全局队列中发现了 g,则直接返回; 13)还需要再检查是否存在 poll 网络的 G,如果有,则直接返回; 14)什么也没找到,休眠当前的 M。

runtime.findrunnable 函数的逻辑如图 5.2 所示:

图5.2 runtime.findrunnable()函数逻辑

如果调度循环函数 runtime.schedule() 从通过 runtime.globrunqget() 从全局队列,通过 runtime.runqget() 从 P 本地队列,以及 runtime.findrunnable 从各个地方,获取到了一个可执行的 G, 则会调用 runtime.execute() 函数去执行它,它会通过 runtime.gogo() 将 G 调度到当前线程上开始真正执行,之后 runtime.gogo() 会调用 runtime.goexit(),并依次进入runtime.goexit1(),和 runtime.goexit0(),最后在 runtime.goexit0() 函数的结尾会再次调用 runtime.schedule() ,进入下一次调度循环。

6. 总结

总结的内容已经放在了开头的结论中了。

最近听到一句话:任何领域的顶尖高手,都是在花费大量时间和身心投入后,达到了用灵魂触碰到更高维度的真实存在的境界,而不是在用头脑在思考和工作,因此作出来的产品都极具美感、实用性和创造性,就好像偷取了上帝的创意一样。

在 Go 调度器的底层原理的学习中,不仅需要亲自花时间去分析源码的细节,更加要大量阅读 Go 开发者的文章,需要用心体会机制设计背后的原因。

Reference

Go语言设计与实现6.5节调度器 https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-goroutine/#g

Go语言原本6.3节MPG模型与并发调度单元 https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/mpg/

Go调度器系列(3)图解调度原理 https://lessisbetter.site/2019/04/04/golang-scheduler-3-principle-with-graph/

Golang的协程调度器原理及GMP设计思想 https://www.yuque.com/aceld/golang/srxd6d

详解Go语言调度循环源码实现 https://www.luozhiyun.com/archives/448

golang源码分析之协程调度器底层实现( G、M、P) https://blog.csdn.net/qq_25870633/article/details/83445946

「译」Scheduling In Go Part I - OS Scheduler https://blog.lever.wang/golang/os_schedule/

「译」Scheduling In Go Part II - Go Scheduler https://blog.lever.wang/golang/go_schedule/

深入 golang -- GMP调度 https://zhuanlan.zhihu.com/p/502740833

深度解密 Go 语言之 scheduler https://qcrao.com/post/dive-int