# 深入分析Go1.18 GMP调度器底层原理(精通) **Published by:** [okscan.eth](https://paragraph.com/@okscan/) **Published on:** 2023-01-24 **URL:** https://paragraph.com/@okscan/go1-18-gmp ## Content Go 语言有强大的并发能力,能够简单的通过 go 关键字创建大量的轻量级协程 Goroutine,帮助程序快速执行各种任务,比Java等其他支持多线程的语言在并发方面更为强大,除了会用它,我们还需要掌握其底层原理,自己花时间把 GMP 调度器的底层源码学习一遍,才能对它有较为深刻的理解和掌握,本文是自己个人对于 Go语言 GMP 调度器(Go Scheduler)底层原理的学习笔记。 在学习 Go 语言的 GMP 调度器之前,原以为 GMP 底层原理较为复杂,要花很多时间和精力才能掌握,亲自下场学习之后,才发现其实并不复杂,只需三个多小时就足够:先花半个多小时,学习下刘丹冰Aceld 的 B 站讲解视频《Golang深入理解GPM模型》,然后花两个小时,结合《Go语言设计和实现》6.5节调度器的内容阅读下相关源码,最后,可以快速看看 GoLang-Scheduling In Go 前两篇文章的中译版,这样可以较快掌握 GMP 调度器的设计思想。 当然,如果希望理解的更加透彻,还需要仔细钻研几遍源码,并学习其他各种资料,尤其是 Go 开发者的文章,最好能够输出一篇文章,以加深头脑中神经元的连接和对事情本质的理解,本文就是这一学习思路的结果,希望能帮助到感兴趣的同学。 本文的代码基于 Go1.18.1 版本,跟 Go1.14 版本的调度器的主要逻辑相比,依然没有大的变化,目前看到的改动是调度循环的 runtime.findrunnable() 函数,抽取了多个逻辑封装成了新的方法,比如 M 从 其他 P 上偷取 G 的 runtime.stealWork()。0. 结论先给出整篇文章的结论和大纲,便于大家获取重点: \1. 为了解决 Go 早期多线程 M 对应多协程 G 调度器的全局锁、中心化状态带来的锁竞争导致的性能下降等问题,Go 开发者引入了处理器 P 结构,形成了当前经典的 GMP 调度模型; \2. Go 调度器是指:运行时在用户态提供的多个函数组成的一种机制,目的是高效地调度 G 到 M上去执行; \3. Go 调度器的核心思想是:尽可能复用线程 M,避免频繁的线程创建和销毁;利用多核并行能力,限制同时运行(不包含阻塞)的 M 线程数 等于 CPU 的核心数目; Work Stealing 任务窃取机制,M 可以从其他 M 绑定的 P 的运行队列偷取 G 执行;Hand Off 交接机制,为了提高效率,M 阻塞时,会将 M 上 P 的运行队列交给其他 M 执行;基于协作的抢占机制,为了保证公平性和防止 Goroutine 饥饿问题,Go 程序会保证每个 G 运行 10ms 就让出 M,交给其他 G 去执行,这个 G 运行 10ms 就让出 M 的机制,是由单独的系统监控线程通过 retake() 函数给当前的 G 发送抢占信号实现的,如果所在的 P 没有陷入系统调用且没有满,让出的 G 优先进入本地 P 队列,否则进入全局队列;;基于信号的真抢占机制,Go1.14 引入了基于信号的抢占式调度机制,解决了 GC 垃圾回收和栈扫描时无法被抢占的问题; \4. 由于数据局部性,新创建的 G 优先放入本地队列,在本地队列满了时,会将本地队列的一半 G 和新创建的 G 打乱顺序,一起放入全局队列;本地队列如果一直没有满,也不用担心,全局队列的 G 永远会有 1/61 的机会被获取到,调度循环中,优先从本地队列获取 G 执行,不过每隔61次,就会直接从全局队列获取,至于为啥是 61 次,Dmitry 的视频讲解了,就是要一个既不大又不小的数,而且不能跟其他的常见的2的幂次方的数如 64 或 48 重合; \5. M 优先执行其所绑定的 P 的本地运行队列中的 G,如果本地队列没有 G,则会从全局队列获取,为了提高效率和负载均衡,会从全局队列获取多个 G,而不是只取一个,个数是自己应该从全局队列中承担的,globrunqsize / nprocs + 1;同样,当全局队列没有时,会从其他 M 的 P 上偷取 G 来运行,偷取的个数通常是其他 P 运行队列的一半; \6. G 在运行时中的状态可以简化成三种:等待中_Gwaiting、可运行_Grunnable、运行中_Grunning,运行期间大部分情况是在这三种状态间来回切换; \7. M 的状态可以简化为只有两种:自旋和非自旋;自旋状态,表示 M 绑定了 P 又没有获取 G;非自旋状态,表示正在执行 Go 代码中,或正在进入系统调用,或空闲; \8. P 结构体中最重要的,是持有一个可运行 G 的长度为 256 的本地环形队列,可以通过 CAS 的方式无锁访问,跟需要加锁访问的全局队列 schedt.runq 相对应; \9. 调度器的启动逻辑是:初始化 g0 和 m0,并将二者互相绑定, m0 是程序启动后的初始线程,g0 是 m0 线程的系统栈代表的 G 结构体,负责普通 G 在 M 上的调度切换 --> runtime.schedinit():负责M、P 的初始化过程,分别调用runtime.mcommoninit() 初始化 M 的全局队列allm 、调用 runtime.procresize() 初始化全局 P 队列 allp --> runtime.newproc():负责获取空闲的 G 或创建新的 G --> runtime.mstart() 启动调度循环;; \10. 调度器的循环逻辑是:运行函数 schedule() --> 通过 runtime.globrunqget() 从全局队列、通过 runtime.runqget() 从 P 本地队列、 runtime.findrunnable 从各个地方,获取一个可执行的 G --> 调用 runtime.execute() 执行 G --> 调用 runtime.gogo() 在汇编代码层面上真正执行G --> 调用 runtime.goexit0() 执行 G 的清理工作,重新将 G 加入 P 的空闲队列 --> 调用 runtime.schedule() 进入下一次调度循环。1. GMP调度模型的设计思想1.1 传统多线程的问题在现代的操作系统中,为了提高并发处理任务的能力,一个 CPU 核上通常会运行多个线程,多个线程的创建、切换使用、销毁开销通常较大: 1)一个内核线程的大小通常达到1M,因为需要分配内存来存放用户栈和内核栈的数据; 2)在一个线程执行系统调用(发生 IO 事件如网络请求或读写文件)不占用 CPU 时,需要及时让出 CPU,交给其他线程执行,这时会发生线程之间的切换; 3)线程在 CPU 上进行切换时,需要保持当前线程的上下文,将待执行的线程的上下文恢复到寄存器中,还需要向操作系统内核申请资源; 在高并发的情况下,大量线程的创建、使用、切换、销毁会占用大量的内存,并浪费较多的 CPU 时间在非工作任务的执行上,导致程序并发处理事务的能力降低。 图1.1 传统多线程之间的切换开销较大1.2 Go语言早期引入的 GM 模型为了解决传统内核级的线程的创建、切换、销毁开销较大的问题,Go 语言将线程分为了两种类型:内核级线程 M (Machine),轻量级的用户态的协程 Goroutine,至此,Go 语言调度器的三个核心概念出现了两个: M: Machine的缩写,代表了内核线程 OS Thread,CPU调度的基本单元; G: Goroutine的缩写,用户态、轻量级的协程,一个 G 代表了对一段需要被执行的 Go 语言程序的封装;每个 Goroutine 都有自己独立的栈存放自己程序的运行状态;分配的栈大小 2KB,可以按需扩缩容; 图1.2 Go将线程拆分为内核线程 M 和用户线程 G 在早期,Go 将传统线程拆分为了 M 和 G 之后,为了充分利用轻量级的 G 的低内存占用、低切换开销的优点,会在当前一个M上绑定多个 G,某个正在运行中的 G 执行完成后,Go 调度器会将该 G 切换走,将其他可以运行的 G 放入 M 上执行,这时一个 Go 程序中只有一个 M 线程: 图1.3 多个 G 对应一个 M 这个方案的优点是用户态的 G 可以快速切换,不会陷入内核态,缺点是每个 Go 程序都用不了硬件的多核加速能力,并且 G 阻塞会导致跟 G 绑定的 M 阻塞,其他 G 也用不了 M 去执行自己的程序了。 为了解决这些不足,Go 后来快速上线了多线程调度器: 图1.4 多个 M 对应多个 G 每个Go程序,都有多个 M 线程对应多个 G 协程,该方案有以下缺点: 1)全局锁、中心化状态带来的锁竞争导致的性能下降; 2)M 会频繁交接 G,导致额外开销、性能下降;每个 M 都得能执行任意的 runnable 状态的 G; 3)每个 M 都需要处理内存缓存,导致大量的内存占用并影响数据局部性; 4)系统调用频繁阻塞和解除阻塞正在运行的线程,增加了额外开销;1.3 当前高效的 GMP 模型为了解决多线程调度器的问题,Go 开发者 Dmitry Vyokov 在已有 G、M 的基础上,引入了 P 处理器,由此产生了当前 Go 中经典的 GMP 调度模型。 P:Processor的缩写,代表一个虚拟的处理器,它维护一个局部的可运行的 G 队列,可以通过 CAS 的方式无锁访问,工作线程 M 优先使用自己的局部运行队列中的 G,只有必要时才会去访问全局运行队列,这大大减少了锁冲突,提高了大量 G 的并发性。每个 G 要想真正运行起来,首先需要被分配一个 P。 如图 1.5 所示,是当前 Go 采用的 GMP 调度模型。可运行的 G 是通过处理器 P 和线程 M 绑定起来的,M 的执行是由操作系统调度器将 M 分配到 CPU 上实现的,Go 运行时调度器负责调度 G 到 M 上执行,主要在用户态运行,跟操作系统调度器在内核态运行相对应。 图1.5 当前高效的GMP调度模型 需要说明的是,Go 调度器也叫 Go 运行时调度器,或 Goroutine 调度器,指的是由运行时在用户态提供的多个函数组成的一种机制,目的是为了高效地调度 G 到 M上去执行。可以跟操作系统的调度器 OS Scheduler 对比来看,后者负责将 M 调度到 CPU 上运行。从操作系统层面来看,运行在用户态的 Go 程序只是一个请求和运行多个线程 M 的普通进程,操作系统不会直接跟上层的 G 打交道。 至于为什么不直接将本地队列放在 M 上、而是要放在 P 上呢? 这是因为当一个线程 M 阻塞(可能执行系统调用或 IO请求)的时候,可以将和它绑定的 P 上的 G 转移到其他线程 M 去执行,如果直接把可运行 G 组成的本地队列绑定到 M,则万一当前 M 阻塞,它拥有的 G 就不能给到其他 M 去执行了。 基于 GMP 模型的 Go 调度器的核心思想是: \1. 尽可能复用线程 M:避免频繁的线程创建和销毁; \2. 利用多核并行能力:限制同时运行(不包含阻塞)的 M 线程数为 N,N 等于 CPU 的核心数目,这里通过设置 P 处理器的个数为 GOMAXPROCS 来保证,GOMAXPROCS 一般为 CPU 核数,因为 M 和 P 是一一绑定的,没有找到 P 的 M 会放入空闲 M 列表,没有找到 M 的 P 也会放入空闲 P 列表; \3. Work Stealing 任务窃取机制:M 优先执行其所绑定的 P 的本地队列的 G,如果本地队列为空,可以从全局队列获取 G 运行,也可以从其他 M 偷取 G 来运行;为了提高并发执行的效率,M 可以从其他 M 绑定的 P 的运行队列偷取 G 执行,这种 GMP 调度模型也叫任务窃取调度模型,这里,任务就是指 G; \4. Hand Off 交接机制:M 阻塞,会将 M 上 P 的运行队列交给其他 M 执行,交接效率要高,才能提高 Go 程序整体的并发度; \5. 基于协作的抢占机制:每个真正运行的G,如果不被打断,将会一直运行下去,为了保证公平,防止新创建的 G 一直获取不到 M 执行造成饥饿问题,Go 程序会保证每个 G 运行10ms 就要让出 M,交给其他 G 去执行; \6. 基于信号的真抢占机制:尽管基于协作的抢占机制能够缓解长时间 GC 导致整个程序无法工作和大多数 Goroutine 饥饿问题,但是还是有部分情况下,Go调度器有无法被抢占的情况,例如,for 循环或者垃圾回收长时间占用线程,为了解决这些问题, Go1.14 引入了基于信号的抢占式调度机制,能够解决 GC 垃圾回收和栈扫描时存在的问题。2. 多图详解几种常见的调度场景在进入 GMP 调度模型的数据结构和源码之前,可以先用几张图形象的描述下 GMP 调度机制的一些场景,帮助理解 GMP 调度器为了保证公平性、可扩展性、及提高并发效率,所设计的一些机制和策略。 1)创建 G: 正在 M1 上运行的P,有一个G1,通过go func() 创建 G2 后,由于局部性,G2优先放入P的本地队列; 图2.1 正在M1上运行的G1通过go func() 创建 G2 后,由于局部性,G2优先放入P的本地队列 2)G 运行完成后:M1 上的 G1 运行完成后(调用goexit()函数),M1 上运行的 Goroutine 会切换为 G0,G0 负责调度协程的切换(运行schedule() 函数),从 M1 上 P 的本地运行队列获取 G2 去执行(函数execute());注意:这里 G0 是程序启动时的线程 M(也叫M0)的系统栈表示的 G 结构体,负责 M 上 G 的调度; 图2.2 M1 上的 G1 运行完会切换到 P 本地队列的 G2 运行 3)M 上创建的 G 个数大于本地队列长度时:如果 P 本地队列最多能存 4 个G(实际上是256个),正在 M1 上运行的 G2 要通过go func()创建 6 个G,那么,前 4 个G 放在 P 本地队列中,G2 创建了第 5 个 G(G7)时,P 本地队列中前一半和 G7 一起打乱顺序放入全局队列,P 本地队列剩下的 G 往前移动,G2 创建的第 6 个 G(G8)时,放入 P 本地队列中,因为还有空间; 图2.3 M1上的G2要创建的G个数多于P本地队列能够存放的G个数时 4)M 的自旋状态:创建新的 G 时,运行的 G 会尝试唤醒其他空闲的 M 绑定 P 去执行,如果 G2 唤醒了M2,M2 绑定了一个 P2,会先运行 M2 的 G0,这时 M2 没有从 P2 的本地队列中找到 G,会进入自旋状态(spinning),自旋状态的 M2 会尝试从全局空闲线程队列里面获取 G,放到 P2 本地队列去执行,获取的数量满足公式:n = min(len(globrunqsize)/GOMAXPROCS + 1, len(localrunsize/2)),含义是每个P应该从全局队列承担的 G 数量,为了提高效率,不能太多,要给其他 P 留点; 图2.4 创建新的 G 时,运行的G会尝试唤醒其他空闲的M绑定P去执行 5)任务窃取机制:自旋状态的 M 会寻找可运行的 G,如果全局队列为空,则会从其他 P 偷取 G 来执行,个数是其他 P 运行队列的一半; 图2.5 自旋状态的 M 会寻找可运行的 G,如果全局队列为空,则会从其他 P 偷取 G 来执行 6)G 发生系统调用时:如果 G 发生系统调度进入阻塞,其所在的 M 也会阻塞,因为会进入内核状态等待系统资源,和 M 绑定的 P 会寻找空闲的 M 执行,这是为了提高效率,不能让 P 本地队列的 G 因所在 M 进入阻塞状态而无法执行;需要说明的是,如果是 M 上的 G 进入 Channel 阻塞,则该 M 不会一起进入阻塞,因为 Channel 数据传输涉及内存拷贝,不涉及系统资源等待; 图2.6 如果 G 发生阻塞,其所在的 M 也会阻塞,和 M 绑定的 P 会寻找空闲的 M 执行 7)G 退出系统调用时:如果刚才进入系统调用的 G2 解除了阻塞,其所在的 M1 会寻找 P 去执行,优先找原来的 P,发现没有找到,则其上的 G2 会进入全局队列,等其他 M 获取执行,M1 进入空闲队列; 图 2.7 当 G 解除阻塞时,所在的 M会寻找 P 去执行,如果没有找到,则 G 进入全局队列,M 进入空闲队列3. GMP的数据结构和各种状态3.1 G 的数据结构和状态G 的数据结构是:// src/runtime/runtime2.go type g struct { stack stack // 描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi) stackguard0 uintptr // 用于调度器抢占式调度 _panic *_panic // 最内侧的 panic 结构体 _defer *_defer // 最内侧的 defer 延迟函数结构体 m *m // 当前 G 占用的线程,可能为空 sched gobuf // 存储 G 的调度相关的数据 atomicstatus uint32 // G 的状态 goid int64 // G 的 ID waitreason waitReason //当状态status==Gwaiting时等待的原因 preempt bool // 抢占信号 preemptStop bool // 抢占时将状态修改成 `_Gpreempted` preemptShrink bool // 在同步安全点收缩栈 lockedm muintptr //G 被锁定只能在这个 m 上运行 waiting *sudog // 这个 g 当前正在阻塞的 sudog 结构体 ...... } G 的主要字段有: ​ stack:描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi); ​ stackguard0: 可以用于调度器抢占式调度;preempt,preemptStop,preemptShrink跟抢占相关; ​ defer 和 panic:分别记录这个 G 最内侧的panic和 _defer结构体; ​ m:记录当前 G 占用的线程 M,可能为空; ​ atomicstatus:表示G 的状态; ​ sched:存储 G 的调度相关的数据; ​ goid:表示 G 的 ID,对开发者不可见; 需要展开描述的是sched 字段的 runtime.gobuf 结构体:type gobuf struct { sp uintptr // 栈指针 pc uintptr // 程序计数器,记录G要执行的下一条指令位置 g guintptr // 持有 runtime.gobuf 的 G ret uintptr // 系统调用的返回值 ...... } 这些字段会在调度器将当前 G 切换离开 M 和调度进入 M 执行程序时用到,栈指针 sp 和程序计数器 pc 用来存放或恢复寄存器中的值,改变程序执行的指令。 结构体 runtime.g 的 atomicstatus 字段存储了当前 G 的状态,G 可能处于以下状态:const ( // _Gidle 表示 G 刚刚被分配并且还没有被初始化 _Gidle = iota // 0 // _Grunnable 表示 G 没有执行代码,没有栈的所有权,存储在运行队列中 _Grunnable // 1 // _Grunning 可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P _Grunning // 2 // _Gsyscall 正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上 _Gsyscall // 3 // _Gwaiting 由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上 _Gwaiting // 4 // _Gdead 没有被使用,没有执行代码,可能有分配的栈 _Gdead // 6 // _Gcopystack 栈正在被拷贝,没有执行代码,不在运行队列上 _Gcopystack // 8 // _Gpreempted 由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒 _Gpreempted // 9 // _Gscan GC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在 _Gscan = 0x1000 ...... ) 其中主要的六种状态是: ​ Gidle:G 被创建但还未完全被初始化; ​ Grunnable:当前 G 为可运行的,正在等待被运行; ​ Grunning:当前 G 正在被运行; ​ Gsyscall:当前 G 正在被系统调用; ​ Gwaiting:当前 G 正在因某个原因而等待; ​ Gdead:当前 G 完成了运行; 图3.1描述了G从创建到结束的生命周期中经历的各种状态变化过程: 图3.1 G的状态变化 虽然 G 在运行时中定义的状态较多且复杂,但是我们可以将这些不同的状态聚合成三种:等待中、可运行、运行中,分别由_Gwaiting、_Grunnable、_Grunning 三种状态表示,运行期间大部分情况是在这三种状态来回切换: ​ 等待中:G 正在等待某些条件满足,例如:系统调用结束等,包括 _Gwaiting、_Gsyscall 几个状态; ​ 可运行:G 已经准备就绪,可以在线程 M 上运行,如果当前程序中有非常多的 G,每个 G 就可能会等待更多的时间,即 _Grunnable; ​ 运行中:G 正在某个线程 M 上运行,即 _Grunning。3.2 M 的数据结构M 的数据结构是:// src/runtime/runtime2.go type m struct { g0 *g // 持有调度栈的 G gsignal *g // 处理 signal 的 g tls [tlsSlots]uintptr // 线程本地存储 mstartfn func() // M的起始函数,go语句携带的那个函数 curg *g // 在当前线程上运行的 G p puintptr // 执行 go 代码时持有的 p (如果没有执行则为 nil) nextp puintptr // 用于暂存与当前 M 有潜在关联的 P oldp puintptr // 执行系统调用前绑定的 P spinning bool // 表示当前 M 是否正在寻找 G,在寻找过程中 M 处于自旋状态 lockedg guintptr // 表示与当前 M 锁定的那个 G ..... } M 的字段众多,其中最重要的为下面几个: ​ g0: Go 运行时系统在启动之初创建的,用来调度其他 G 到 M 上; ​ mstartfn:表示M的起始函数,其实就是go 语句携带的那个函数; ​ curg:存放当前正在运行的 G 的指针; ​ p:指向当前与 M 关联的那个 P; ​ nextp:用于暂存于当前 M 有潜在关联的 P; ​ spinning:表示当前 M 是否正在寻找 G,在寻找过程中 M 处于自旋状态; ​ lockedg:表示与当前M锁定的那个 G,运行时系统会把 一个 M 和一个 G 锁定,一旦锁定就只能双方相互作用,不接受第三者; M 并没有像 G 和 P 一样的状态标记, 但可以认为一个 M 有以下的状态: ​ 自旋中(spinning): M 正在从运行队列获取 G, 这时候 M 会拥有一个 P; ​ 执行go代码中: M 正在执行go代码, 这时候 M 会拥有一个 P; ​ 执行原生代码中: M 正在执行原生代码或者阻塞的syscall, 这时M并不拥有P; ​ 休眠中: M 发现无待运行的 G 时会进入休眠, 并添加到空闲 M 链表中, 这时 M 并不拥有 P。3.3 P 的数据结构P 的数据结构是:// src/runtime/runtime2.go type p struct { status uint32 // p 的状态 pidle/prunning/... schedtick uint32 // 每次执行调度器调度 +1 syscalltick uint32 // 每次执行系统调用 +1 m muintptr // 关联的 m mcache *mcache // 用于 P 所在的线程 M 的内存分配的 mcache deferpool []*_defer // 本地 P 队列的 defer 结构体池 // 可运行的 Goroutine 队列,可无锁访问 runqhead uint32 runqtail uint32 runq [256]guintptr // 线程下一个需要执行的 G runnext guintptr // 空闲的 G 队列,G 状态 status 为 _Gdead,可重新初始化使用 gFree struct { gList n int32 } ...... } 最主要的数据结构是 status 表示 P 的不同的状态,而 runqhead、runqtail 和 runq 三个字段表示处理器持有的运行队列,是一个长度为256的环形队列,其中存储着待执行的 G 列表,runnext 中是线程下一个需要执行的 G;gFree 存储 P 本地的状态为_Gdead 的空闲的 G,可重新初始化使用。 P 结构体中的状态 status 字段会是以下五种中的一种: ​ _Pidle:P 没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空; ​ _Prunning:被线程 M 持有,并且正在执行用户代码或者调度器; ​ _Psyscall:没有执行用户代码,当前线程陷入系统调用; ​ _Pgcstop:被线程 M 持有,当前处理器由于垃圾回收被停止; ​ _Pdead:当前 P 已经不被使用; P 的五种状态之间的转化关系如图 3.2 所示: 图3.2 P的状态变化3.4 schedt 的数据结构调度器的schedt结构体存储了全局的 G 队列,空闲的 M 列表和 P 列表:// src/runtime/runtime2.go type schedt struct { lock mutex // schedt的锁 midle muintptr // 空闲的M列表 nmidle int32 // 空闲的M列表的数量 nmidlelocked int32 // 被锁定正在工作的M数 mnext int64 // 下一个被创建的 M 的 ID maxmcount int32 // 能拥有的最大数量的 M pidle puintptr // 空闲的 P 链表 npidle uint32 // 空闲 P 数量 nmspinning uint32 // 处于自旋状态的 M 的数量 // 全局可执行的 G 列表 runq gQueue runqsize int32 // 全局可执行 G 列表的大小 // 全局 _Gdead 状态的空闲 G 列表 gFree struct { lock mutex stack gList // Gs with stacks noStack gList // Gs without stacks n int32 } // sudog结构的集中存储 sudoglock mutex sudogcache *sudog // 有效的 defer 结构池 deferlock mutex deferpool *_defer ...... } 除了上面的四个结构体,还有一些全局变量:// src/runtime/runtime2.go var ( allm *m // 所有的 M gomaxprocs int32 // P 的个数,默认为 ncpu 核数 ncpu int32 ...... sched schedt // schedt 全局结构体 newprocs int32 allpLock mutex // 全局 P 队列的锁 allp []*p // 全局 P 队列,个数为 gomaxprocs ...... } 此外,src/runtime/proc.go 文件有两个全局变量:// src/runtime/proc.go var ( m0 m // 进程启动后的初始线程 g0 g // 代表着初始线程的stack ...... ) 到这里,G、M、P、schedt结构体和全局变量都描述完毕,GMP 的全部队列如下表3-1所示: 表3-1 GMP的队列 中文名源码的名称作用域简要说明全局M列表runtime.allm运行时系统存放所有M全局P列表runtime.allp运行时系统存放所有P调度器中的空闲M列表runtime.schedt.midle调度器存放空闲M调度器中的空闲P列表runtime.schedt.pidle调度器存放空闲P调度器中的可运行G队列runtime.schedt.runq调度器存放可运行GP的本地可运行G队列runtime.p.runq本地P存放当前P中的可运行G调度器中的空闲G列表runtime.schedt.gfree调度器存放空闲的GP中的空闲G列表runtime.p.gfree本地P存放当前P中的空闲G4. 调度器的启动4.1 程序启动流程Go 程序一启动,Go 的运行时 runtime 自带的调度器 scheduler 就开始启动了。 对于一个最简单的Go程序:package main import "fmt" func main() { fmt.Println("hello world") } 通过 gdb或dlv的方式调试,会发现程序的真正入口不是在 runtime.main,对 AMD64 架构上的 Linux 和 macOS 服务器来说,分别在runtime包的 src/runtime/rt0_linux_amd64.s 和 src/runtime/rt0_darwin_amd64.s:TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8 JMP _rt0_amd64(SB) TEXT _rt0_amd64_darwin(SB),NOSPLIT,$-8 JMP _rt0_amd64(SB) 两者均跳转到了 src/runtime/asm_amd64.s 包的 _rt0_amd64 函数:TEXT _rt0_amd64(SB),NOSPLIT,$-8 MOVQ 0(SP), DI // argc LEAQ 8(SP), SI // argv JMP runtime·rt0_go(SB) _rt0_amd64 函数调用了 src/runtime/asm_arm64.s 包的 runtime·rt0_go 函数:TEXT runtime·rt0_go(SB),NOSPLIT|TOPFRAME,$0 ...... // 初始化g0 MOVD $runtime·g0(SB), g ...... // 初始化 m0 MOVD $runtime·m0(SB), R0 // 绑定 g0 和 m0 MOVD g, m_g0(R0) MOVD R0, g_m(g) ...... BL runtime·schedinit(SB) // 调度器初始化 // 创建一个新的 goroutine 来启动程序 MOVD $runtime·mainPC(SB), R0 // main函数入口 ....... BL runtime·newproc(SB) // 负责根据主函数即 main 的入口地址创建可被运行时调度的执行单元goroutine ....... // 开始启动调度器的调度循环 BL runtime·mstart(SB) ...... DATA runtime·mainPC+0(SB)/8,$runtime·main<ABIInternal>(SB) // main函数入口地址 GLOBL runtime·mainPC(SB),RODATA,$8 Go程序的真正启动函数 runtime·rt0_go 主要做了几件事: 1)初始化 g0 和 m0,并将二者互相绑定, m0 是程序启动后的初始线程,g0 是 m0 的系统栈代表的 G 结构体,负责普通 G 在M 上的调度切换; 2)schedinit:进行各种运行时组件初始化工作,这包括我们的调度器与内存分配器、回收器的初始化; 3)newproc:负责根据主函数即 main 的入口地址创建可被运行时调度的执行单元; 4)mstart:开始启动调度器的调度循环; 阅读 Go 调度器的源码,需要先从整体结构上对其有个把握,Go 程序启动后的调度器主逻辑如图 4.1 所示: 图4.1 调度器主逻辑 下面分为两部分来分析调度器的原理:调度器的启动和调度循环。4.2 调度器的启动调度器启动函数在 src/runtime/proc.go 包的 schedinit() 函数:// src/runtime/proc.go // 调度器初始化 func schedinit() { ...... _g_ := getg() ...... // 设置机器线程数M最大为10000 sched.maxmcount = 10000 ...... // 栈、内存分配器相关初始化 stackinit() // 初始化栈 mallocinit() // 初始化内存分配器 ...... // 初始化当前系统线程 M0 mcommoninit(_g_.m, -1) ...... // GC初始化 gcinit() ...... // 设置P的值为GOMAXPROCS个数 procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } // 调用procresize调整 P 列表 if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } ...... } schedinit() 函数会设置 M 最大数量为10000,实际中不会达到;会分别调用stackinit() 、mallocinit() 、mcommoninit() 、gcinit() 等执行 goroutine栈初始化、进行内存分配器初始化、进行系统线程M0的初始化、进行GC垃圾回收器的初始化;接着,将 P 个数设置为 GOMAXPROCS 的值,即程序能够同时运行的最大处理器数,最后会调用 runtime.procresize()函数初始化 P 列表。 图4.2 runtime.schedinit() 函数逻辑 schedinit() 函数负责M、P、G 的初始化过程。M/P/G 彼此的初始化顺序遵循:mcommoninit、procresize、newproc,他们分别负责初始化 M 资源池(allm)、P 资源池(allp)、G 的运行现场(g.sched)以及调度队列(p.runq)。 mcommoninit() 函数主要负责对 M0 进行一个初步的初始化,并将其添加到 schedt 全局结构体中,这里访问 schedt 会加锁:// src/runtime/proc.go func mcommoninit(mp *m, id int64) { ...... lock(&sched.lock) if id >= 0 { mp.id = id } else { // mReserveID() 会返回 sched.mnext 给当前 m,并对 sched.mnext++,记录新增加的这个 M 到 schedt 全局结构体 mp.id = mReserveID() } ...... // 添加到 allm 中 mp.alllink = allm // 等价于 allm = mp atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp)) unlock(&sched.lock) ...... } runtime.procresize()函数的逻辑是:// src/runtime/proc.go func procresize(nprocs int32) *p { ...... // 获取先前的 P 个数 old := gomaxprocs ...... // 如果全局变量 allp 切片中的处理器数量少于期望数量,对 allp 扩容 if nprocs > int32(len(allp)) { // 加锁 lock(&allpLock) if nprocs <= int32(cap(allp)) { // 如果要达到的 P 个数 nprocs 小于当前全局 P 切片到容量 allp = allp[:nprocs] // 在当前全局 P 切片上截取前 nprocs 个 P } else { // 否则,调大了,超出全局 P 切片的容量,创建容量为 nprocs 的新的 P 切片 nallp := make([]*p, nprocs) // 将原有的 p 复制到新创建的 nallp 中 copy(nallp, allp[:cap(allp)]) allp = nallp // 新的 nallp 切片赋值给旧的 allp } ...... unlock(&allpLock) } // 使用 new 创建新的 P 结构体并调用 runtime.p.init 初始化刚刚扩容的allp列表里的 P for i := old; i < nprocs; i++ { pp := allp[i] // 如果 p 是新创建的(新创建的 p 在数组中为 nil),则申请新的 P 对象 if pp == nil { pp = new(p) } pp.init(i) atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) } _g_ := getg() // 当前 G 的 M 上的 P 不为空,并且其 id 小于 nprocs,说明 ID 有效,则可以继续使用当前 G 的 P if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // 继续使用当前 P, 其状态设置为 _Prunning _g_.m.p.ptr().status = _Prunning _g_.m.p.ptr().mcache.prepareForSweep() } else { // 否则,释放当前 P 并获取 allp[0] if _g_.m.p != 0 { ...... _g_.m.p.ptr().m = 0 } _g_.m.p = 0 // 将处理器 allp[0] 绑定到当前 M p := allp[0] p.m = 0 p.status = _Pidle // P 状态设置为 _Pidle acquirep(p) // 将allp[0]绑定到当前的 M if trace.enabled { traceGoStart() } } mcache0 = nil // 调用 runtime.p.destroy 释放从未使用的 P for i := nprocs; i < old; i++ { p := allp[i] p.destroy() // 不能释放 p 本身,因为他可能在 m 进入系统调用时被引用 } // 裁剪 allp,保证allp长度与期望处理器数量相等 if int32(len(allp)) != nprocs { lock(&allpLock) allp = allp[:nprocs] idlepMask = idlepMask[:maskWords] timerpMask = timerpMask[:maskWords] unlock(&allpLock) } var runnablePs *p // 将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局的空闲队列中 for i := nprocs - 1; i >= 0; i-- { p := allp[i] if _g_.m.p.ptr() == p { // 跳过当前 P continue } p.status = _Pidle // 设置 P 的状态为空闲状态 if runqempty(p) { pidleput(p) // 放入到全局结构体 schedt 的空闲 P 列表中 } else { p.m.set(mget()) // 如果有本地任务,则为其绑定一个 M p.link.set(runnablePs) runnablePs = p } } stealOrder.reset(uint32(nprocs)) var int32p *int32 = &gomaxprocs atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) return runnablePs // 返回所有包含本地任务的 P 链表 } runtime.procresize() 函数 的执行过程如下: 1)如果全局变量 allp 切片中的 P 数量少于期望数量,会对切片进行扩容; 2)使用 new 创建新的 P 结构体并调用 runtime.p.init 初始化刚刚扩容的 P; 3)通过指针将线程 m0 和处理器 allp[0] 绑定到一起; 4)调用 runtime.p.destroy 释放不再使用的 P 结构; 5)通过切片截断改变全局变量 allp 的长度,保证它与期望 P 数量相等; 6)将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局 schedt 的空闲 P 队列中; runtime.procresize() 函数的逻辑如图 4.3 所示: 图4.3 runtime.procresize() 函数逻辑 runtime.procresize() 函数调用 runtime.p.init 初始化新创建的 P:// src/runtime/proc.go // 初始化 P func (pp *p) init(id int32) { pp.id = id // p 的 id 就是它在 allp 中的索引 pp.status = _Pgcstop // 新创建的 p 处于 _Pgcstop 状态 ...... // 为 P 分配 cache 对象,涉及对象分配 if pp.mcache == nil { if id == 0 { if mcache0 == nil { throw("missing mcache?") } pp.mcache = mcache0 } else { pp.mcache = allocmcache() } } ...... } 需要说明的是,mcache内存结构原来是在 M 上的,自从引入了 P 之后,就将该结构体移到了P上,这样,就不用每个 M 维护自己的内存分配 mcache,由于 P 在有 M 可以执行时才会移动到其他 M 上去,空闲的 M 无须分配内存,这种灵活性使整体线程的内存分配大大减少。4.3 怎样创建 G ?我们再回到 4.1 节对程序启动函数 runtime·rt0_go,有个动作是通过 runtime.newproc 函数创建 G,runtime.newproc 入参是 funcval 结构体函数,代表 go 关键字后面调用的函数:// src/runtime/proc.go // 创建G,并放入 P 的运行队列 func newproc(fn *funcval) { gp := getg() pc := getcallerpc() // 获取调用方 PC 寄存器值,即调用方程序要执行的下一个指令地址 // 用 g0 系统栈创建 Goroutine 对象 // 传递的参数包括 fn 函数入口地址, gp(g0),调用方 pc systemstack(func() { newg := newproc1(fn, gp, pc) // 调用 newproc1 获取 Goroutine 结构 _p_ := getg().m.p.ptr() // 获取当前 G 的 P runqput(_p_, newg, true) // 将新的 G 放入 P 的本地运行队列 if mainStarted { // M 启动时唤醒新的 P 执行 G wakep() } }) } runtime.newproce函数主要是调用 runtime.newproc1 获取新的 Goroutine 结构,将新的 G 放入P的运行队列,M 启动时唤醒新的 P 执行 G。 runtime.newproce函数的逻辑如图4.4所示: 图4.4 runtime.newproce() 函数逻辑 runtime.newproce1() 函数的逻辑是:// src/runtime/proc.go // 创建一个运行fn函数的goroutine func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g { _g_ := getg() // 因为是在系统栈运行所以此时的 g 为 g0 if fn == nil { _g_.m.throwing = -1 // do not dump full stacks throw("go of nil func value") } acquirem() // 加锁,禁止这时 G 的 M 被抢占,因为它可以在一个局部变量中保存 P _p_ := _g_.m.p.ptr() // 获取 P newg := gfget(_p_) // 从 P 的空闲列表获取一个空闲的 G if newg == nil { // 找不到则创建 newg = malg(_StackMin) // 创建一个栈大小为 2K 的 G casgstatus(newg, _Gidle, _Gdead) // CAS 改变 G 的状态为_Gdead allgadd(newg) // 将 _Gdead 状态的 g 添加到 allg,这样 GC 不会扫描未初始化的栈 } ...... // 计算运行空间大小,对齐 totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) totalSize = alignUp(totalSize, sys.StackAlign) sp := newg.stack.hi - totalSize // 确定 SP 和参数入栈位置 spArg := sp ...... // 清理、创建并初始化 G 的运行现场 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp // 保存goexit的地址到sched.pc newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) // 初始化 G 的基本状态 newg.gopc = callerpc newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn ...... // 将 G 的状态设置为_Grunnable casgstatus(newg, _Gdead, _Grunnable) ...... // 生成唯一的goid newg.goid = int64(_p_.goidcache) _p_.goidcache++ ...... // 释放对 M 加的锁 releasem(_g_.m) return newg } runtime.newproc1() 函数主要执行三个动作: 1)获取或者创建新的 Goroutine 结构体,会先从处理器的 gFree 列表中查找空闲的 Goroutine,如果不存在空闲的 Goroutine,会通过 runtime.malg 创建一个栈大小足够的新结构体,新创建的 G 的状态为_Gdead; 2)将传入的参数 callergp,callerpc,fn更新到 G 的栈上,初始化 G 的相关参数; 3)将 G 状态设置为 _Grunnable 状态,返回; runtime.newproc1() 函数的逻辑如图 4.5 所示: 图4.5 runtime.newproc1() 函数逻辑 runtime.newproc1() 函数主要调用 runtime.gfget() 函数 获取 G:// src/runtime/proc.go func gfget(_p_ *p) *g { retry: // 如果 P 的空闲列表 gFree 为空,sched 的的空闲列表 gFree 不为空 if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) { lock(&sched.gFree.lock) // 从 sched 的 gFree 列表中移动 32 个 G 到 P 的 gFree 中 for _p_.gFree.n < 32 { gp := sched.gFree.stack.pop() if gp == nil { gp = sched.gFree.noStack.pop() if gp == nil { break } } sched.gFree.n-- _p_.gFree.push(gp) _p_.gFree.n++ } unlock(&sched.gFree.lock) goto retry } // 如果此时 P 的空闲列表还是为空,返回nil,说明无空闲的G gp := _p_.gFree.pop() if gp == nil { return nil } _p_.gFree.n-- // 设置 G 的栈空间 if gp.stack.lo == 0 { systemstack(func() { gp.stack = stackalloc(_FixedStack) }) gp.stackguard0 = gp.stack.lo + _StackGuard } else { ..... } return gp // 从 P 的空闲列表获取 G 返回 } runtime.gfget() 函数的主要逻辑是:当 P 的空闲列表 gFree 为空时,从 sched 持有的全局空闲列表 gFree 中移动最多 32个 G 到当前的 P 的空闲列表上;然后从 P 的 gFree 列表头返回一个 G;如果还是没有,则返回空,说明获取不到空闲的 G。 在 runtime.newproc1() 函数中,如果不存在空闲的 G,会通过 runtime.malg() 创建一个栈大小足够的新结构体:// src/runtime/proc.go // 创建一个新的 g 结构体 func malg(stacksize int32) *g { newg := new(g) if stacksize >= 0 { // 如果申请的堆栈大小大于 0,会通过 runtime.stackalloc 分配 2KB 的栈空间 stacksize = round2(_StackSystem + stacksize) systemstack(func() { newg.stack = stackalloc(uint32(stacksize)) }) newg.stackguard0 = newg.stack.lo + _StackGuard newg.stackguard1 = ^uintptr(0) *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0 } return newg } 回到 runtime.newproce函数,在获取到 G 后,会调用 runtime.runqput() 函数将 G 放入 P 本地队列,或全局队列:// src/runtime/proc.go // 将 G 放入 P 的运行队列中 func runqput(_p_ *p, gp *g, next bool) { // 保持一定的随机性,不将当前 G 设置为 P 的下一个执行的任务 if randomizeScheduler && next && fastrandn(2) == 0 { next = false } if next { retryNext: // 将 G 放入到 P 的 runnext 变量中,作为下一个 P 执行的任务 oldnext := _p_.runnext if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } if oldnext == 0 { return } // 获取原来的 runnext 存储的 G,放入 P 本地运行队列,或全局队列 gp = oldnext.ptr() } retry: h := atomic.LoadAcq(&_p_.runqhead) // 获取 P 环形队列的头和尾部指针 t := _p_.runqtail // P 本地环形队列没有满,将 G 放入本地环形队列 if t-h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.StoreRel(&_p_.runqtail, t+1) return } // P 本地环形队列已满,将 G 放入全局队列 if runqputslow(_p_, gp, h, t) { return } // 本地队列和全局队列没有满,则不会走到这里,否则循环尝试放入 goto retry } runtime.runqput() 函数的主要处理逻辑是: 1)保留一定的随机性,设置 next 为 false,即不将当前 G 设置为 P 的下一个执行的 G; 2)当 next 为 true 时,将 G 设置到 P 的 runnext 作为 P 下一个执行的任务; 3)当 next 为 false 并且本地运行队列还有剩余空间时,将 Goroutine 加入处理器持有的本地运行队列; 4)当处理器的本地运行队列已经没有剩余空间时,就会把本地队列中的一部分 G 和待加入的 G 通过 runtime.runqputslow 添加到调度器持有的全局运行队列上; runtime.runqput() 函数的逻辑如图 4.6 所示: 图4.6 runtime.runqput() 函数的逻辑 runtime.runqputslow() 函数的逻辑如下:// 将 G 和 P 本地队列的一部分放入全局队列 func runqputslow(_p_ *p, gp *g, h, t uint32) bool { var batch [len(_p_.runq)/2 + 1]*g // 初始化一个本地队列长度一半 + 1 的 G 列表 batch // 首先,从 P 本地队列中获取一部分 G 放入初始化的列表 batch n := t - h n = n / 2 if n != uint32(len(_p_.runq)/2) { throw("runqputslow: queue is not full") } for i := uint32(0); i < n; i++ { // 将 P 本地环形队列的前一半 G 放入batch batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() } if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume return false } batch[n] = gp // 将传入的 G 放入列表 batch 的尾部 if randomizeScheduler { // 打乱 batch 列表中G的顺序 for i := uint32(1); i <= n; i++ { j := fastrandn(i + 1) batch[i], batch[j] = batch[j], batch[i] } } // 将 batch列表的 G 串成一个链表. for i := uint32(0); i < n; i++ { batch[i].schedlink.set(batch[i+1]) } var q gQueue // 将 batch 列表设置成 gQueue 队列 q.head.set(batch[0]) q.tail.set(batch[n]) // 现在把 gQueue 队列放入全局队列 lock(&sched.lock) globrunqputbatch(&q, int32(n+1)) unlock(&sched.lock) return true } runtime.runqputslow() 函数会把 P 本地环形队列的前一半 G 获取出来,跟传入的 G 组成一个列表,打乱顺序,再放入全局队列。 综上所属,用下图表示调度器启动流程: 图4.7 调度器启动流程5. 调度循环我们再回到5.1节的程序启动流程,runtime·rt0_go 函数在调用 runtime.schedinit() 初始化好了调度器、调用 runtime.newproc()创建了main函数的 G 后,会调用runtime.mstart() 函数启动 M 去执行G。TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME,$0 CALL runtime·mstart0(SB) RET // not reached runtime.mstart() 是用汇编写的,会直接调用 runtime.mstart0() 函数:// src/runtime/proc.go func mstart0() { _g_ := getg() ...... // 初始化 g0 的参数 _g_.stackguard0 = _g_.stack.lo + _StackGuard _g_.stackguard1 = _g_.stackguard0 mstart1() ...... mexit(osStack) } runtime.mstart0() 函数主要调用 runtime.mstart1():// src/runtime/proc.go func mstart1() { _g_ := getg() if _g_ != _g_.m.g0 { throw("bad runtime·mstart") } // 记录当前栈帧,便于其他调用复用,当进入 schedule 之后,再也不会回到 mstart1 _g_.sched.g = guintptr(unsafe.Pointer(_g_)) _g_.sched.pc = getcallerpc() _g_.sched.sp = getcallersp() asminit() minit() // 设置信号 handler;在 minit 之后,因为 minit 可以准备处理信号的的线程 if _g_.m == &m0 { mstartm0() } // 执行启动函数 if fn := _g_.m.mstartfn; fn != nil { fn() } // 如果当前 m 并非 m0,则要求绑定 p if _g_.m != &m0 { acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } // 准备好后,开始调度循环,永不返回 schedule() } runtime.mstart1() 保存调度信息后,会调用 runtime.schedule() 进入调度循环,寻找一个可执行的 G 并执行。 循环调度主逻辑如图5.1所示: 图5.1 循环调度主逻辑 runtime.schedule() 函数的逻辑是:// src/runtime/proc.go func schedule() { _g_ := getg() if _g_.m.locks != 0 { throw("schedule: holding locks") } ...... top: pp := _g_.m.p.ptr() pp.preempt = false if sched.gcwaiting != 0 { // 如果需要 GC,不再进行调度 gcstopm() goto top } if pp.runSafePointFn != 0 { // 不等于0,说明在安全点 runSafePointFn() } // 如果 G 所在的 M 在自旋,说明其P运行队列为空,如果不为空,则应该甩出错误 if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) { throw("schedule: spinning with local work") } // 运行 P 上准备就绪的 Timer checkTimers(pp, 0) var gp *g var inheritTime bool ...... if gp == nil { // 说明不在 GC // 每调度 61 次,就检查一次全局队列,保证公平性;否则两个 Goroutine 可以通过互换,一直占领本地的 runqueue if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) // 从全局队列中偷 g unlock(&sched.lock) } } if gp == nil { // 从 P 的本地队列获取 G gp, inheritTime = runqget(_g_.m.p.ptr()) } if gp == nil { gp, inheritTime = findrunnable() // 阻塞式查找可用 G } // M 这时候一定是获取到了G // 如果 M 是自旋状态,重置其状态到非自旋 if _g_.m.spinning { resetspinning() } ....... // 执行 G execute(gp, inheritTime) } runtime.schedule() 函数会从下面几个地方查找待执行的 Goroutine: 1)为了保证公平,当全局运行队列中有待执行的 G 时,通过 schedtick 对 61 取模,表示每 61 次会有一次从全局的运行队列中查找对应的 G,这样可以避免两个 G 在 P 本地队列互换一直占有本地队列; 2)调用 runtime.runqget() 函数从 P 本地的运行队列中获取待执行的 G; 3)如果前两种方法都没有找到 G,会通过 runtime.findrunnable() 进行阻塞地查找 G; runtime.schedule 函数从全局队列获取 G 的函数是 runtime.globrunqget() 函数:// 从全局队列获取 G func globrunqget(_p_ *p, max int32) *g { assertLockHeld(&sched.lock) // 如果全局队列没有 G,则直接返回 if sched.runqsize == 0 { return nil } // 计算n,表示从全局队列放入本地队列的 G 的个数 n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } // n 不能超过取的要获取的max个数 if max > 0 && n > max { n = max } // 计算能不能用本地队列的一般放下 n 个 G,如果放不下,则 n 设为本地队列的一半 if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } sched.runqsize -= n // 拿到全局队列的队头作为返回的 G gp := sched.runq.pop() n-- // n计数减 1 // 继续取剩下的 n-1个全局队列 G 放入本地队列 for ; n > 0; n-- { gp1 := sched.runq.pop() runqput(_p_, gp1, false) } return gp } runtime.globrunqget() 函数会从全局队列获取 n 个 G,第一个 G 返回给调度器去执行,剩下的 n-1 个 G 放入本地队列,其中,n一般为全局队列长度 / P处理器个数 + 1,含义是平均每个 P 应该从全局队列中承担的 G 数量,且不能超过 P 本地长度的一半。 runtime.schedule() 函数调用 runtime.runqget() 函数从 P 本地的运行队列中获取待执行的 G:// 从 P 本地队列中获取 G func runqget(_p_ *p) (gp *g, inheritTime bool) { // 如果 P 有一个 runnext,则它就是下一个要执行的 G. next := _p_.runnext // 如果 runnext 不为空,而 CAS 失败, 则它又可能被其他 P 偷取了, // 因为其他 P 可以竞争机会到设置 runnext 为 0, 当前 P 只能设置该字段为非0 if next != 0 && _p_.runnext.cas(next, 0) { return next.ptr(), true } for { h := atomic.LoadAcq(&_p_.runqhead) //从本地环形队列头遍历 t := _p_.runqtail if t == h { // 头尾指针相等,表示本地队列为空 return nil, false } // 获取头部指针指向的 G gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { return gp, false } } } 本地队列的获取会先从 P 的 runnext 字段中获取,如果不为空则直接返回。如果 runnext 为空,那么从本地环形队列头指针遍历本地队列,取到了则返回。 阻塞式获取 G 的 runtime.findrunnable() 函数的整个逻辑看起来比较繁琐,其实无非是按这个顺序获取 G: local -> global -> netpoll -> steal -> local -> global -> netpoll:// 找到一个可运行的 G 去执行 // 会从其他 P 的运行队列偷取,从本地会全局队列获取,或从网络轮询器获取 func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() if sched.gcwaiting != 0 { // 如果在 gc,则休眠当前 m,直到复始后回到 top gcstopm() goto top } if _p_.runSafePointFn != 0 { // 不等于0,说明在安全点 runSafePointFn() } now, pollUntil, _ := checkTimers(_p_, 0) // 取本地队列 local runq,如果已经拿到,立刻返回 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // 全局队列 global runq,如果已经拿到,立刻返回 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // 从 netpoll 网络轮询器中尝试获取 G,优先级比从其他 P 偷取 G 要高 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } // 自旋 M: 从其他 P 中窃取任务 G // 限制自旋 M 数量到忙碌P数量的一半. 避免一半P数量、并行机制很慢时的CPU消耗 procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // 从其他 P 或 timer 中偷取G gp, inheritTime, tnow, w, newWork := stealWork(now) now = tnow if gp != nil { // Successfully stole. return gp, inheritTime } if newWork { // 可能有新的 timer 或 GC,重新开始 goto top } if w != 0 && (pollUntil == 0 || w < pollUntil) { // Earlier timer to wait for. pollUntil = w } } // 没有任何 work 可做。 // 如果我们在 GC mark 阶段,则可以安全的扫描并标记对象为黑色 // 然后便有 work 可做,运行 idle-time 标记而非直接放弃当前的 P。 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop()) if node != nil { _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := node.gp.ptr() casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } ..... // 放弃当前的 P 之前,对 allp 做一个快照 allpSnapshot := allp idlepMaskSnapshot := idlepMask timerpMaskSnapshot := timerpMask // 准备归还 p,对调度器加锁 lock(&sched.lock) // 进入了 gc,回到顶部并停止 m if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } // 全局队列中又发现了任务 if sched.runqsize != 0 { gp := globrunqget(_p_, 0) // 赶紧偷掉返回 unlock(&sched.lock) return gp, false } if releasep() != _p_ { // 归还当前的 p throw("findrunnable: wrong p") } pidleput(_p_) // 将 p 放入 idle 链表 unlock(&sched.lock) // 完成归还,解锁 // 这里要非常小心: 线程从自旋到非自旋状态的转换,可能与新 Goroutine 的提交同时发生 wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false // M 即将睡眠,状态不再是 spinning if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } // 再次检查所有的 runqueue _p_ = checkRunqsNoP(allpSnapshot, idlepMaskSnapshot) if _p_ != nil { acquirep(_p_) _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) goto top } // 再次检查 idle-priority GC work,和上面重新找 runqueue 的逻辑类似 _p_, gp = checkIdleGCNoP() if _p_ != nil { acquirep(_p_) _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) // Run the idle worker. _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } // 最后, 检查 timer creation pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil) } // 再次检查 netpoll 网络轮询器,和上面重新找 runqueue 的逻辑类似 if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { ...... lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ == nil { injectglist(&list) } else { acquirep(_p_) if !list.empty() { gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } } else if pollUntil != 0 && netpollinited() { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil > pollUntil { netpollBreak() } } stopm() // 真的什么都没找到,暂止当前的 m goto top } runtime.findrunnable 函数的主要工作是: 1)首先检查是是否正在进行 GC,如果是则休眠当前的 M ; 2)尝试从本地队列中取 G,如果取到,则直接返回,否则继续从全局队列中找 G,如果找到则直接返回; 3)检查 netpoll 网络轮询器中是否有 G,如果有,则直接返回; 4)如果此时仍然无法找到 G,则从其他 P 的本地队列中偷取;从其他 P 本地队列偷取的工作会执行四轮,如果找到 G,则直接返回; 5)所有的可能性都尝试过了,在准备休眠 M 之前,还要进行额外的检查; 6)首先检查此时是否是 GC mark 阶段,如果是,则直接返回 mark 阶段的 G; 7)如果仍然没有,则对当前的 P 进行快照,准备对调度器进行加锁; 8)当调度器被锁住后,仍然还需再次检查这段时间里是否有进入 GC,如果已经进入了 GC,则回到第一步,阻塞 M 并休眠; 9)如果又在全局队列中发现了 G,则直接返回; 10)当调度器被锁住后,我们彻底找不到任务了,则归还释放当前的 P,将其放入 idle 链表中,并解锁调度器; 11)当 M、P 已经解绑后,我们需要将 M 的状态切换出自旋状态,并减少 nmspinning; 12)此时仍然需要重新检查所有的队列,如果我们又在全局队列中发现了 g,则直接返回; 13)还需要再检查是否存在 poll 网络的 G,如果有,则直接返回; 14)什么也没找到,休眠当前的 M。 runtime.findrunnable 函数的逻辑如图 5.2 所示: 图5.2 runtime.findrunnable()函数逻辑 如果调度循环函数 runtime.schedule() 从通过 runtime.globrunqget() 从全局队列,通过 runtime.runqget() 从 P 本地队列,以及 runtime.findrunnable 从各个地方,获取到了一个可执行的 G, 则会调用 runtime.execute() 函数去执行它,它会通过 runtime.gogo() 将 G 调度到当前线程上开始真正执行,之后 runtime.gogo() 会调用 runtime.goexit(),并依次进入runtime.goexit1(),和 runtime.goexit0(),最后在 runtime.goexit0() 函数的结尾会再次调用 runtime.schedule() ,进入下一次调度循环。6. 总结总结的内容已经放在了开头的结论中了。 最近听到一句话:任何领域的顶尖高手,都是在花费大量时间和身心投入后,达到了用灵魂触碰到更高维度的真实存在的境界,而不是在用头脑在思考和工作,因此作出来的产品都极具美感、实用性和创造性,就好像偷取了上帝的创意一样。 在 Go 调度器的底层原理的学习中,不仅需要亲自花时间去分析源码的细节,更加要大量阅读 Go 开发者的文章,需要用心体会机制设计背后的原因。 Reference Go语言设计与实现6.5节调度器 https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-goroutine/#g Go语言原本6.3节MPG模型与并发调度单元 https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/mpg/ Go调度器系列(3)图解调度原理 https://lessisbetter.site/2019/04/04/golang-scheduler-3-principle-with-graph/ Golang的协程调度器原理及GMP设计思想 https://www.yuque.com/aceld/golang/srxd6d 详解Go语言调度循环源码实现 https://www.luozhiyun.com/archives/448 golang源码分析之协程调度器底层实现( G、M、P) https://blog.csdn.net/qq_25870633/article/details/83445946 「译」Scheduling In Go Part I - OS Scheduler https://blog.lever.wang/golang/os_schedule/ 「译」Scheduling In Go Part II - Go Scheduler https://blog.lever.wang/golang/go_schedule/ 深入 golang -- GMP调度 https://zhuanlan.zhihu.com/p/502740833 深度解密 Go 语言之 scheduler https://qcrao.com/post/dive-int ## Publication Information - [okscan.eth](https://paragraph.com/@okscan/): Publication homepage - [All Posts](https://paragraph.com/@okscan/): More posts from this publication - [RSS Feed](https://api.paragraph.com/blogs/rss/@okscan): Subscribe to updates