唤醒工作线程

  1. 去尝试唤醒工作线程条件:atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0
    • atomic.Load(&sched.npidle) != 0:有空闲的P
    • atomic.Load(&sched.nmspinning) == 0:没有工作线程正在尝试从其他工作线程的本地队列偷取goroutine。(就是没有spinning自旋的goroutine时)
  2. 唤醒空闲的PMwakep()函数完成。

wakep()

  1. 尝试添加一个P来执行G。当G是可运行时调用(newproc, ready)。
  2. wakep()函数被调用的地方:【创建g时候,在newproc()函数中,就是go关键字】,【g放回P的时候,在ready()函数中】。
    • newproc()函数中,也就是go关键字时,runtime.main已启动时。(这种情况发生在go关键字时)
    • ready()函数中,该函数通过把需要唤醒的goroutine放入运行队列来唤醒它。(这种情况在g被挂在了其他地方时需要恢复到P中时)
  3. 也就是只要g被放入本地队列中,准备运行时都需要调用wakep()函数尝试利用空闲的MP来运行它。
  4. 文件位置:go1.19.3/src/runtime/proc.go
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
    // 没有空闲的P,直接返回。当前所有的P都在工作。因为P的数量是固定的。
    if atomic.Load(&sched.npidle) == 0 {
        return
    }
    
    // be conservative about spinning threads
    // 
    // 对自旋的线程持保守态度:
    // 	1. sched.nmspinning != 0:有其他线程正在自旋(就是在其他线程中去偷取g)
    //  2. !atomic.Cas(&sched.nmspinning, 0, 1):通过cas操作再次确认是否有其他工作线程处于spinning状态
    // 从进入wakep()判断到真正启动工作线程之前的这一段时间之内,如果已经有工作线程进入了spinning状态而在四处寻找需要运行的goroutine
    // 这样的话我们就没有必要再启动一个多余的工作线程出来了
    // 如果cas操作成功,则继续调用startm创建一个新的或唤醒一个处于睡眠状态的工作线程出来工作
    // 
    // 1. atomic.Load(&sched.nmspinning) != 0 成立:有其他工作线程正在自旋,直接return退出
    // 2. atomic.Load(&sched.nmspinning) == 0 成立:当前没有忙碌的工作线程
    //    atomic.Cas(&sched.nmspinning, 0, 1) == true:当前没有忙碌的工作线程,当前可以创建工作线程,并标记sched.nmspinning=1阻止后来者
    //    atomic.Cas(&sched.nmspinning, 0, 1) == false:当前有其他忙碌的工作线程,直接return退出
    if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    
    // 程序执行到这里说明:sched.nmspinning一定被标记为1了。
    startm(nil, true)
}

startm()

  1. 调度一些M来运行p(如果需要,创建一个M)。
  2. 如果p==nil,尝试得到一个空闲的p,如果没有空闲的p什么都不做。
  3. 可以使用m.p==nil运行,因此不允许写入障碍。
  4. 如果设置了spinning,则调用者增加了nmspinning,而startm将减少nmspinning或在新启动的M中设置m.spinning
  5. 传递nil的P的调用方必须从不可抢占的上下文中调用。见下面对acquirem
  6. 必须没有写障碍,因为这个可能没有P
  7. go:nowritebarrierrec:不允许编译器插入写屏障相关代码。
  8. 在抢占系统调用的P的时候该函数会被调用,并传入空闲的Pfalse参数。
  9. 参数:
    • _p_ *pnil表示没有指定P,否则指定P
    • spinning booltrue.sched.nmspinning的值在前面被加一了。false.没有加一。
  10. 文件位置:go1.19.3/src/runtime/proc.go
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//
// Callers passing a non-nil P must call from a non-preemptible context. See
// comment on acquirem below.
//
// Must not have write barriers because this may be called without a P.
//
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
    // Disable preemption.
    //
    // Every owned P must have an owner that will eventually stop it in the
    // event of a GC stop request. startm takes transient ownership of a P
    // (either from argument or pidleget below) and transfers ownership to
    // a started M, which will be responsible for performing the stop.
    //
    // Preemption must be disabled during this transient ownership,
    // otherwise the P this is running on may enter GC stop while still
    // holding the transient P, leaving that P in limbo and deadlocking the
    // STW.
    //
    // Callers passing a non-nil P must already be in non-preemptible
    // context, otherwise such preemption could occur on function entry to
    // startm. Callers passing a nil P may be preemptible, so we must
    // disable preemption before acquiring a P from pidleget below.
    mp := acquirem()    // 禁止当前M被抢占
    lock(&sched.lock)   // 锁住全局sched
    // 有空闲的p才会去唤醒线程
    if _p_ == nil {     // 如果没有指定P,则需要从P的空闲列表中获取一个P
        _p_ = pidleget(0)   // 从P的空闲队列中获取空闲的P
        // 没有空闲的P,意味着所有的P都很忙不需要唤醒
        if _p_ == nil {	
            unlock(&sched.lock)
            // 之前的Cas把nmspinning加一,这里需要减回来
            if spinning {	
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                //
                // 正常逻辑这里不应该减成负数,否则是系统逻辑存在错误
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                    throw("startm: negative nmspinning")
                }
            }
            // 与acquirem函数呼应,并检查是否有抢占请求发生
            releasem(mp)	
            return  // 没有空闲的P直接返回
        }
    }
    
    // 尝试从m空闲队列中获取正处于睡眠之中的工作线程
    // 所有处于睡眠状态的m都在此队列中
    nmp := mget()
    
    // 没有处于睡眠状态的工作线程,这种情况需要去创建线程
    if nmp == nil {	
        // No M is available, we must drop sched.lock and call newm.
        // However, we already own a P to assign to the M.
        //
        // Once sched.lock is released, another G (e.g., in a syscall),
        // could find no idle P while checkdead finds a runnable G but
        // no running M's because this new M hasn't started yet, thus
        // throwing in an apparent deadlock.
        //
        // Avoid this situation by pre-allocating the ID for the new M,
        // thus marking it as 'running' before we drop sched.lock. This
        // new M will eventually run the scheduler to execute any
        // queued G's.
        // 
        // 这里是为需要新创建的工作线程准备工作
        id := mReserveID()      // 给需要创建的工作线程分配ID
        unlock(&sched.lock)

        // 初始化fn函数,该函数在工作线程刚启动时会被调用
        var fn func()   // nil
        if spinning {   // 如果需要标记当前工作线程是自旋状态
            // The caller incremented nmspinning, so set m.spinning in the new M.
            // 
            // mspinning函数就一行代码 'getg().m.spinning = true' 标记当前工作线程是自旋状态
            // 因为全局的sched.nmspinning已经加一了,因此需要标记m的spinning
            fn = mspinning	
        }
        newm(fn, _p_, id)   // 创建新的工作线程
        // Ownership transfer of _p_ committed by start in newm.
        // Preemption is now safe.
        releasem(mp)
        return
    }
    
    // 到这里说明有正在处于睡眠的工作线程
    unlock(&sched.lock)
    // 从空闲的线程队列中拿出来的 spinning 标志位存在,说明sleep时有问题
    if nmp.spinning { // 系统逻辑存在问题
        throw("startm: m is spinning")
    }
    // 工作线程还与其他P有关,说明有问题
    if nmp.nextp != 0 { // 系统逻辑存在问题
        throw("startm: m has p")
    }
    // 空闲的P中不应该存在g,系统逻辑存在问题
    if spinning && !runqempty(_p_) {	
        throw("startm: p has runnable gs")
    }
    
    // The caller incremented nmspinning, so set m.spinning in the new M.
    //
    // 调用者增加nmspinning,因此将m.spinning设置为新的M。因此当前这个M就是这个自旋的M
    nmp.spinning = spinning // 标记当前需要唤醒的工作线程 自旋的状态
    // 当前M的P暂时放在nextp上
    // 这里也就是为什么新创建的工作线程直接在nextp去取P,原因在这里关联的
    // 因为_p_只在这里存在,因此不会存在其他工作线程使用该P。
    nmp.nextp.set(_p_)	
    // 唤醒工作线程,工作线程睡眠在 nmp.park 上面
    notewakeup(&nmp.park)	
    // Ownership transfer of _p_ committed by wakeup. Preemption is now
    // safe.
    // 
    // 唤醒提交的_p_的所有权转移。 抢占现在是安全的
    // 与acquirem函数呼应,并检查是否有抢占请求发生
    releasem(mp) 
}

acquirem()

  1. m加锁禁止抢占当前m
  2. 文件位置:go1.19.3/src/runtime/runtime1.go
473
474
475
476
477
478
//go:nosplit
func acquirem() *m {
    _g_ := getg()
    _g_.m.locks++
    return _g_.m
}

releasem()

  1. 文件位置:go1.19.3/src/runtime/runtime1.go
482
483
484
485
486
487
488
489
490
func releasem(mp *m) {
    _g_ := getg()
    mp.locks--
    // 当前M没有锁,并且G需要被抢占
    if mp.locks == 0 && _g_.preempt {
        // restore the preemption request in case we've cleared it in newstack
        _g_.stackguard0 = stackPreempt // 设置抢占标志
    }
}

pidleget()

  1. sched.pidle中尝试获取一个空闲的P
  2. 参数now int64:0则取当前时间点。
  3. 返回值:
    • *p:返回一个空闲的P,否则为nil没有空闲的P
    • int64:传入now的时间值,0则是当前时间值。
  4. 文件位置:go1.19.3/src/runtime/proc.go
5727
5728
5729
5730
5731
5732
5733
5734
5735
5736
5737
5738
5739
5740
5741
5742
5743
5744
5745
5746
5747
5748
5749
5750
5751
5752
5753
5754
5755
5756
// pidleget tries to get a p from the _Pidle list, acquiring ownership.
//
// sched.lock must be held.
//
// May run during STW, so write barriers are not allowed.
//
//go:nowritebarrierrec
func pidleget(now int64) (*p, int64) {
    // sched.lock 必须被持有
    assertLockHeld(&sched.lock)

    // 从sched.pidle上获取空闲的P
    _p_ := sched.pidle.ptr()
    if _p_ != nil {
        // Timer may get added at any time now.
        if now == 0 {
            now = nanotime()
        }
        // 设置timerpMask和idlepMask
        timerpMask.set(_p_.id)
        idlepMask.clear(_p_.id)
        // 从全局空闲的P中移除_p_
        sched.pidle = _p_.link 
        // 全局的空闲P的次数减一
        atomic.Xadd(&sched.npidle, -1)
        // limiterEvent跟踪GC CPU限制器的事件。
        _p_.limiterEvent.stop(limiterEventIdle, now)
    }
    return _p_, now
}

mget()

  1. 尝试从sched.midle获取一个空闲的工作线程m,起来绑定P运行。
  2. 文件位置:go1.19.3/src/runtime/proc.go
5547
5548
5549
5550
5551
5552
5553
5554
5555
5556
5557
5558
5559
5560
5561
5562
5563
5564
// Try to get an m from midle list.
// sched.lock must be held.
// May run during STW, so write barriers are not allowed.
//
//go:nowritebarrierrec
func mget() *m {
    assertLockHeld(&sched.lock)

    // 空闲的m在sched.midle上
    mp := sched.midle.ptr()
    if mp != nil {
        // 从sched.midle上移除mp
        sched.midle = mp.schedlink
        // 空闲的m数量减一
        sched.nmidle--
    }
    return mp
}

mReserveID()

  1. 给新创建的工作线程分配唯一的ID
  2. 文件位置:go1.19.3/src/runtime/proc.go
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
// mReserveID returns the next ID to use for a new m. This new m is immediately
// considered 'running' by checkdead.
//
// sched.lock must be held.
func mReserveID() int64 {
    // sched.lock锁必须被持有
    assertLockHeld(&sched.lock)

    // 分配的ID溢出了
    if sched.mnext+1 < sched.mnext {
        throw("runtime: thread ID overflow")
    }
    id := sched.mnext // 分配的ID
    sched.mnext++ // 下一个ID
    // 检查sched.mnext - sched.nmfreed > sched.maxmcount
    // sched.maxmcount 在 runtime.main 中被设置为 10000
    // sched.mnext 下一个分配的ID,该值是累加的
    // sched.nmfreed 已经释放的工作线程数量
    // 因此这里是检查当前已经创建的工作线程数量不能大于最大值
    checkmcount()
    return id
}

notewakeup()

  1. 首先使用atomic.Xchg设置note.key值为1。
  2. 这是为了使被唤醒的线程可以通过查看该值是否等于1来确定是被其它线程唤醒还是意外从睡眠中苏醒了过来,
  3. 如果该值为1则表示是被唤醒的,可以继续工作了。
  4. 但如果该值为0则表示是意外苏醒,需要再次进入睡眠,
  5. 工作线程苏醒之后的处理逻辑我们已经在notesleep()函数中见过,所以这里略过。
  6. 文件位置:go1.19.3/src/runtime/lock_futex.go
139
140
141
142
143
144
145
146
147
func notewakeup(n *note) {
    // 设置n.key = 1, 被唤醒的线程通过查看该值是否等于1来确定是被其它线程唤醒还是意外从睡眠中苏醒
    old := atomic.Xchg(key32(&n.key), 1)
    if old != 0 {   // 如果旧值不是0说明系统逻辑有问题
        print("notewakeup - double wakeup (", old, ")\n")
        throw("notewakeup - double wakeup")
    }
    futexwakeup(key32(&n.key), 1) // 调用futexwakeup唤醒
}

futexwakeup()

  1. 对于Linux平台来说,工作线程通过note睡眠其实是通过futex系统调用睡眠在内核之中,
  2. 所以唤醒处于睡眠状态的线程也需要通过futex系统调用进入内核来唤醒。
  3. 所以这里的futexwakeup()又继续调用包装了futex系统调用的futex()函数来实现唤醒睡眠在内核中的工作线程。
  4. 文件位置:go1.19.3/src/runtime/os_linux.go
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// If any procs are sleeping on addr, wake up at most cnt.
//go:nosplit
func futexwakeup(addr *uint32, cnt uint32) {
    // 调用futex函数唤醒工作线程
    ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
    if ret >= 0 {	// 调用成功是这里直接返回
        return
    }

    // I don't know that futex wakeup can return
    // EAGAIN or EINTR, but if it does, it would be
    // safe to loop and call futex again.
    systemstack(func() {
        print("futexwakeup addr=", addr, " returned ", ret, "\n")
    })

    // 程序不会到这里来,即使到这里来了,向一个未知地址写入数据直接宕机
    *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}

futex()

  1. futex()函数由汇编代码写成,前面的几条指令都在为futex系统调用准备参数,
  2. 参数准备完成之后则通过SYSCALL指令进入操作系统内核完成线程的唤醒功能。
  3. 内核在完成唤醒工作之后当前工作线程则从内核返回到futex()函数继续执行SYSCALL指令之后的代码并按函数调用链原路返回。
  4. 继续执行其它代码,而被唤醒的工作线程则由内核负责在适当的时候调度到CPU上运行。
  5. 陷入系统调用太长时间的工作线程会在监控线程中剥离PG,具体的参看监控线程相关代码。这里没有标记工作线程陷入系统调用的标志。
  6. 应该是当前函数调用不会形成阻塞。
  7. 文件位置:go1.19.3/src/runtime/sys_linux_amd64.s
549
550
551
552
553
554
555
556
557
558
559
560
561
562
# int64 futex(int32 *uaddr, int32 op, int32 val,
#   struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
    #这6条指令在为futex系统调用准备参数
    MOVQ    addr+0(FP), DI
    MOVL    op+8(FP), SI
    MOVL    val+12(FP), DX
    MOVQ    ts+16(FP), R10
    MOVQ    addr2+24(FP), R8
    MOVL    val3+32(FP), R9
    MOVL    $SYS_futex, AX  # futex系统调用编号放入AX寄存器
    SYSCALL                 # 系统调用,进入内核
    MOVL    AX, ret+40(FP)  # 系统调用通过AX寄存器返回返回值,这里把返回值保存到内存之中
    RET

创建工作线程

  1. 如果没有正处于休眠状态的工作线程,则需要调用newm()函数新建一个工作线程。

newm()

  1. 创建调度线程和监控线程都是通过该函数,执行该函数时都会把栈切换到g0栈,因为g0栈比较大。
  2. 该函数在以下两种情况下使用:
    1. newm(sysmon, nil, -1):创建【监控线程】。
      • sysmon工作线程开始执行时首先调用的函数。(不是入口函数,newm()创建的入口函数都是固定的mstart()函数)
      • nil表示不需要绑定P
      • -1系统会自动分配一个递增的数字。(全局唯一的ID
    2. newm(fn, _p_, id):创建【调度线程】。
      • fn工作线程开始执行时首先调用的函数。(不是入口函数,newm()创建的入口函数都是固定的mstart()函数)
      • _p_线程启动时需要绑定的P
      • 创建工作线程的唯一ID
  3. 创建一个新的m。它将从对fn调度器的调用开始。fn需要是静态的,而不是一个堆分配闭包。可以使用 m.p==nil运行,因此不允许写入障碍。
  4. id是可选的,预分配的Mid。通过传递-1来省略。
  5. go:nowritebarrierrec:告诉编译器该函数及里面所调用的函数都不插入写屏障代码。
  6. 参数:
    1. fn func():新创建的工作线程启动后需要执行的函数,不能是一个堆分配的闭包,必须是一个静态的函数。
      • 也就是所有创建的线程入口函数都是mstart()函数是线程的入口函数数,参看newosproc()函数。
    2. _p_ *p:新创建的工作线程需要绑定的P,该值可以为 nil,表示不绑定P
    3. id int64:新创建的工作线程的ID值,该值可以是-1,表示系统自动分配一个递增的ID数值。
  7. 文件位置:go1.19.3/src/runtime/proc.go
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//
// id is optional pre-allocated m ID. Omit by passing -1.
// 
//go:nowritebarrierrec
func newm(fn func(), _p_ *p, id int64) {
    // allocm adds a new M to allm, but they do not start until created by
    // the OS in newm1 or the template thread.
    //
    // doAllThreadsSyscall requires that every M in allm will eventually
    // start and be signal-able, even with a STW.
    //
    // Disable preemption here until we start the thread to ensure that
    // newm is not preempted between allocm and starting the new thread,
    // ensuring that anything added to allm is guaranteed to eventually
    // start.
    //
    // allocm将一个新的M添加到allm中,但是直到操作系统在newm1或模板线程中创建它们才开始。
    // doAllThreadsSyscall 要求allm中的每个M最终都将启动并可发送信号,即使是STW。
    // 在这里禁用抢占,直到我们启动线程,以确保newm在allocm和启动新线程之间不被抢占,确保添加到allm的任何内容最终都能启动。
    acquirem()	// 禁止当前工作线程被抢占

    // allocm从堆上分配一个m结构体,并绑定M与其他相关例如allp等
    mp := allocm(_p_, fn, id)
    mp.nextp.set(_p_)	// 设置当前M需要用到的P
    mp.sigmask = initSigmask
    if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
        // We're on a locked M or a thread that may have been
        // started by C. The kernel state of this thread may
        // be strange (the user may have locked it for that
        // purpose). We don't want to clone that into another
        // thread. Instead, ask a known-good thread to create
        // the thread for us.
        //
        // This is disabled on Plan 9. See golang.org/issue/22227.
        //
        // TODO: This may be unnecessary on Windows, which
        // doesn't model thread creation off fork.
        lock(&newmHandoff.lock)
        if newmHandoff.haveTemplateThread == 0 {
            throw("on a locked thread with no template thread")
        }
        mp.schedlink = newmHandoff.newm
        newmHandoff.newm.set(mp)
        if newmHandoff.waiting {
            newmHandoff.waiting = false
            notewakeup(&newmHandoff.wake)
        }
        unlock(&newmHandoff.lock)
        // The M has not started yet, but the template thread does not
        // participate in STW, so it will always process queued Ms and
        // it is safe to releasem.
        releasem(getg().m)
        return
    }
    newm1(mp)
    releasem(getg().m)
}

acquirem()

  1. 文件位置:go1.19.3/src/runtime/runtime1.go
473
474
475
476
477
478
//go:nosplit
func acquirem() *m {
    _g_ := getg()
    _g_.m.locks++
    return _g_.m
}

allocm()

  1. 分配一个不与任何线程关联的新m。如果需要,可以使用p作为分配上下文。
  2. fn被记录为新mm.mstartfnid是可选的,预分配的mid。通过传递-1来省略。
  3. 这个函数允许有写障碍,即使调用者没有,因为它借用了_p_
  4. go:yeswritebarrierrec:允许编译器插入写屏障相关代码,因为调用者使用的是go:nowritebarrierrec
  5. 文件位置:go1.19.3/src/runtime/proc.go
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
// Allocate a new m unassociated with any thread.
// Can use p for allocation context if needed.
// fn is recorded as the new m's m.mstartfn.
// id is optional pre-allocated m ID. Omit by passing -1.
//
// This function is allowed to have write barriers even if the caller
// isn't because it borrows _p_.
//
//go:yeswritebarrierrec
func allocm(_p_ *p, fn func(), id int64) *m {
    allocmLock.rlock()  // 读加锁

    // The caller owns _p_, but we may borrow (i.e., acquirep) it. We must
    // disable preemption to ensure it is not stolen, which would make the
    // caller lose ownership.
    acquirem()          // 禁止当前M被抢占

    _g_ := getg()       // 当前g
    // 注意这里是正在执行的工作线程,在此函数中为 malloc 临时借用 p
    if _g_.m.p == 0 {   // 如果当前M没有绑定P,则去绑定一个P
        // 在这个函数中暂时借用p来代替mallocs
        // 当前这个工作线程没有绑定p需要临时借用这个_p_,这种情况可能是sysmon线程中来的。
        acquirep(_p_)   // temporarily borrow p for mallocs in this function
    }

    // Release the free M list. We need to do this somewhere and
    // this may free up a stack we can use.
    //
    // sched.freem 存储的是等待释放的m的链表
    if sched.freem != nil { // 释放需要释放的M列表
        lock(&sched.lock)
        var newList *m
        // freem 是一组已经运行结束的M构成的链表(不是空闲的)。
        for freem := sched.freem; freem != nil; {
            // freeWait 释放g0和删除m是否安全(freeMRef, freeMStack, freeMWait中的一个)
            if freem.freeWait != 0 {
                next := freem.freelink
                freem.freelink = newList
                newList = freem
                freem = next
                continue
            }
            // stackfree must be on the system stack, but allocm is
            // reachable off the system stack transitively from
            // startm.
            //
            // stackfree 必须在系统栈上,但allocm从startm开始在系统栈之外是可访问的。
            systemstack(func() {
                stackfree(freem.g0.stack) // 释放栈
            })
            freem = freem.freelink
        }
        sched.freem = newList
        unlock(&sched.lock)
    }

    // 堆分配一个m
    mp := new(m)		
    // M开始前需要执行的函数,不是M的入口函数,是执行mstart后会调用的函数。
    //  1. 如果是调度线下这里存储的是mspinning()函数
    //  2. 如果是监控线程存储的是sysmon()函数
    mp.mstartfn = fn	
    // 初始化M,主要是把m加入到allm中,m记录allm地址等
    // 该函数在程序初始化过程中也被调用过
    mcommoninit(mp, id)	 

    // In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.
    // Windows and Plan 9 will layout sched stack on OS stack.
    if iscgo || mStackIsSystemAllocated() {
        mp.g0 = malg(-1)
    } else {
        // runtime 的g0栈分配的是64kb左右大小,其他的g0栈分配的是8kb左右大小
        // 关于malg函数,是来自栈相关
        mp.g0 = malg(8192 * sys.StackGuardMultiplier)   // 分配一个大概8Kb左右的g0作为系统栈
    }
    mp.g0.m = mp	// g0与m关联

    if _p_ == _g_.m.p.ptr() {   // 如果前面临时借用了P,这里需要还出来
        releasep()              // 解绑当前工作线程M和P的关联
    }

    releasem(_g_.m)             // 判断当前g是否需要被抢占,设置抢占标志
    allocmLock.runlock()
    return mp
}

acquirep()

  1. p和当前m联系起来。
  2. 这个函数允许有写障碍,即使调用者没有,因为它立即获得_p_
  3. go:yeswritebarrierrec:允许编译器插入写屏障相关代码。
  4. 文件位置:go1.19.3/src/runtime/proc.go
4938
4939
4940
4941
4942
4943
4944
4945
4946
4947
4948
4949
4950
4951
4952
4953
4954
4955
4956
4957
4958
4959
// Associate p and the current m.
//
// This function is allowed to have write barriers even if the caller
// isn't because it immediately acquires _p_.
//
//go:yeswritebarrierrec
func acquirep(_p_ *p) {
    // Do the part that isn't allowed to have write barriers.
    wirep(_p_)	// 把 P 与 M 绑定起来

    // Have p; write barriers now allowed.
    // 现在有p了;现在允许写屏障。

    // Perform deferred mcache flush before this P can allocate
    // from a potentially stale mcache.
    // 在此P可以从可能不新鲜的mcache分配之前,执行延迟的mcache刷新。
    _p_.mcache.prepareForSweep()

    if trace.enabled {
        traceProcStart()
    }
}
  1. wirep是acquirep的第一步,它实际上将当前M关联到_p_
  2. 因为我们还没有P,所以我们可以在这部分不允许写障碍。
  3. go:nowritebarrierrec:不允许编译器插入写屏障相关代码。
  4. 文件位置:go1.19.3/src/runtime/proc.go
4959
4960
4961
4962
4963
4964
4965
4966
4967
4968
4969
4970
4971
4972
4973
4974
4975
4976
4977
4978
4979
4980
4981
4982
4983
4984
4985
// wirep is the first step of acquirep, which actually associates the
// current M to _p_. This is broken out so we can disallow write
// barriers for this part, since we don't yet have a P.
//
//go:nowritebarrierrec
//go:nosplit
func wirep(_p_ *p) {
    _g_ := getg()   // g

    // 这里来的m一定需要是没绑定p的。
    if _g_.m.p != 0 {
        throw("wirep: already in go")
    }
    // 当前p也是不能绑定m的,并且当前p的状态不能是 _Pidle
    if _p_.m != 0 || _p_.status != _Pidle {
        id := int64(0)
        if _p_.m != 0 {
            id = _p_.m.ptr().id
        }
        print("wirep: p->m=", _p_.m, "(", id, ") p->status=", _p_.status, "\n")
        throw("wirep: invalid p state")
    }
    // p 与 m 相互绑定
    _g_.m.p.set(_p_)        // m.p = _p_
    _p_.m.set(_g_.m)        // _p_.m = m
    _p_.status = _Prunning  // 设置p的状态为运行中
}

mcommoninit()

  1. 预分配的ID可以作为'ID'传递,也可以通过传递-1来省略。
  2. 文件位置:go1.19.3/src/runtime/proc.go
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
// Pre-allocated ID may be passed as 'id', or omitted by passing -1.
func mcommoninit(mp *m, id int64) {
    _g_ := getg()           // g

    // g0 stack won't make sense for user (and is not necessary unwindable).
    if _g_ != _g_.m.g0 {    // 不能是g0栈
        callers(1, mp.createstack[:])
    }

    lock(&sched.lock)

    if id >= 0 {
        mp.id = id
    } else {
        mp.id = mReserveID()
    }

    lo := uint32(int64Hash(uint64(mp.id), fastrandseed))
    hi := uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
    if lo|hi == 0 {
        hi = 1
    }
    // Same behavior as for 1.17.
    // TODO: Simplify ths.
    if goarch.BigEndian {
        mp.fastrand = uint64(lo)<<32 | uint64(hi)
    } else {
        mp.fastrand = uint64(hi)<<32 | uint64(lo)
    }

    mpreinit(mp)            // 信号相关
    if mp.gsignal != nil {
        mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
    }

    // Add to allm so garbage collector doesn't free g->m
    // when it is just in a register or thread-local storage.
    mp.alllink = allm       // 记录当前m.alllink的全局allm地址

    // NumCgoCall() iterates over allm w/o schedlock,
    // so we need to publish it safely.
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))	// 把m添加到全局allm中
    unlock(&sched.lock)

    // Allocate memory to hold a cgo traceback if the cgo call crashes.
    if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" {
        mp.cgoCallers = new(cgoCallers)
    }
}

newm1()

  1. 文件位置:go1.19.3/src/runtime/proc.go
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
func newm1(mp *m) {
    if iscgo {  // cgo相关代码
        var ts cgothreadstart
        if _cgo_thread_start == nil {
            throw("_cgo_thread_start missing")
        }
        ts.g.set(mp.g0)
        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
        ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart))
        if msanenabled {
            msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        if asanenabled {
            asanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        execLock.rlock() // Prevent process clone.
        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
        execLock.runlock()
        return
    }
    execLock.rlock() // Prevent process clone.
    // newosproc函数 调用clone函数创建一个系统线程
    // 新建的这个系统线程将从mstart()函数开始运行。
    newosproc(mp)
    execLock.runlock()
}

newosproc()

  1. 可以m.p==nil 运行,因此不允许写屏障。
  2. 文件位置:go1.19.3/src/runtime/os_linux.go
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// May run with m.p==nil, so write barriers are not allowed.
//
//go:nowritebarrier
func newosproc(mp *m) {
    stk := unsafe.Pointer(mp.g0.stack.hi)   // 获取当前新创建的M的g0栈栈顶位置
    /*
     * note: strace gets confused if we use CLONE_PTRACE here.
     */
    if false {
        print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", abi.FuncPCABI0(clone), " id=", mp.id, " ostk=", &mp, "\n")
    }

    // Disable signals during clone, so that the new thread starts
    // with signals disabled. It will enable them in minit.
    // 
    // 在克隆期间禁用信号,以便新线程以禁用信号开始。 它将在minit中启用它们。
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    //  cloneFlags = _CLONE_VM | /* share memory */ // 指定父子线程共享进程地址空间
    //      _CLONE_FS | /* share cwd, etc */
    //      _CLONE_FILES | /* share fd table */
    //      _CLONE_SIGHAND | /* share sig handler table */
    //      _CLONE_SYSVSEM | /* share SysV semaphore undo lists (see issue #20763) */
    //      _CLONE_THREAD /* revisit - okay for now */  // 创建子线程而不是子进程
    // 程序的入口都是【mstart()】函数开始
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(abi.FuncPCABI0(mstart)))
    sigprocmask(_SIG_SETMASK, &oset, nil)

    // 怎么也应该出现 ret 小于0的情况
    if ret < 0 {
        print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")
        if ret == -_EAGAIN {
            println("runtime: may need to increase max user processes (ulimit -u)")
        }
        throw("newosproc")
    }
}

clone()

C函数原型:int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void))

  1. int32 flags:指定内核创建线程时需要的选项。
  2. void *stk:新线程应该使用的栈:
    1. 因为即将被创建的线程与当前线程共享同一个进程地址空间,所以这里必须为子线程指定其使用的栈,否则父子线程会共享同一个栈从而造成混乱。
    2. 从上面的newosproc()函数可以看出,新线程使用的栈为m.g0.stack.lo~m.g0.stack.hi这段内存,而这段内存是newm()函数在创建m结构体对象时从进程的堆上分配而来的。
  3. M *mp:工作线程 M 的信息记录。
  4. G *gpg0栈信息记录。
  5. void (*fn)(void):子线程程序入口函数。
  6. 上面三个参数(M *mpG *gpvoid (*fn)(void))保存到寄存器(R13R9R12)中:
    1. 之所以需要在系统调用之前保存这几个参数,原因在于这几个参数目前还位于父线程的栈之中。
    2. 一旦通过系统调用把子线程创建出来之后,子线程将会使用我们在clone系统调用时给它指定的栈。
    3. 所以这里需要把这几个参数先保存到寄存器,等子线程从系统调用返回后直接在寄存器中获取这几个参数。
    4. 这里要注意的是虽然这个几个参数值保存在了父线程的寄存器之中,但创建子线程时,操作系统内核会把父线程的所有寄存器帮我们复制一份给子线程,所以当子线程开始运行时就能拿到父线程保存在寄存器中的值,从而拿到这几个参数。
  7. 文件位置:go1.19.3/src/runtime/sys_linux_amd64.s
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
# int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));
TEXT runtime·clone(SB),NOSPLIT,$0
    # 1) 接受参数分蘖放入 DI、SI、R13、R9、R12寄存器中
    # 清除 DX、R10、D8寄存器的值

    # 第一个参数 flags,clone需要的参数
    MOVL    flags+0(FP), DI # DI = flags
    # 第二个参数 stk,g0栈空间
    MOVQ    stk+8(FP), SI   # SI = stk
    MOVQ    $0, DX          # 清除DX寄存器
    MOVQ    $0, R10         # 清除R10寄存器
    MOVQ    $0, R8          # 清除R8寄存器
    # Copy mp, gp, fn off parent stack for use by child.
    # Careful: Linux system call clobbers CX and R11.
    # 
    # 从父堆栈中复制 mp、gp、fn 以供子级使用
    # 小心:Linux系统调用clobbers CX和R11
    # 第三个参数 mp
    # 在clone后子线程开始运行时,R13、R9、R12的值会被拷贝给子线程
    MOVQ    mp+16(FP), R13  # R13 = mp
    # 第四个参数 gp,这里是 g0
    MOVQ    gp+24(FP), R9   # R9 = gp
    # 第五个参数 fn,mstart()函数
    MOVQ    fn+32(FP), R12  # R12 = fn
    
    # 2) 判断 mp 和 gp 的值是为 nil
    
    # 判断mp==nil和g0=nil
    # m 如果R13为0则跳转
    CMPQ    R13, $0
    JEQ	nog1
    # g	如果R9为0则跳转
    CMPQ    R9, $0    		
    JEQ	nog1
    
    # 3) 找到需要设置TLS的地址值,也就是&m.tls[1]
    # 调用系统SYS_clone函数克隆线程
    
    # 把m.tls地址存入R8寄存器
    LEAQ    m_tls(R13), R8  # R8 = TLS 
#ifdef GOOS_android
    # Android stores the TLS offset in runtime·tls_g.
    SUBQ    runtime·tls_g(SB), R8
#else
    # R8 = -8(FS); R8=&m.tls[1]处地址
    ADDQ    $8, R8	# ELF wants to use -8(FS)
#endif
    # 添加CLONE_SETTLS标志
    ORQ     $0x00080000, DI #add flag CLONE_SETTLS(0x00080000) to call clone
nog1:
    MOVL    $SYS_clone, AX  # 写入clone函数标志,然后调用系统函数
    # 系统调用约定寄存器 DI SI DX R10 R8 R9 参数传参
    # DI = flags
    # SI = stk
    # DX = 0
    # R10 = 0
    # R8 = R8=&m.tls[1]
    # R9 = gp
    SYSCALL
    
    # 4) 系统调用后,新创建的子线程和当前线程都会从系统调用中返回然后执行后面的代码
    # 
    # 那么从系统调用返回之后我们怎么知道哪个是父线程哪个是子线程,从而来决定它们的执行流程?
    # 使用过fork系统调用的读者应该知道,我们需要通过返回值来判断父子线程:
    #   1. 系统调用的返回值如果是0则表示这是子线程
    #   2. 不为0则表示这个是父线程

    # 4.1) 父线程的处理逻辑
    # In parent, return.
    # 
    # 在父线程中,直接返回。
    # 判断系统调用SYS_clone的返回值AX与0比较
    CMPQ    AX, $0
    # JEQ 表示AX是0则执行 3(PC)跳过3条指令
    JEQ	3(PC)   #跳转到子线程部分
    # 这里是父线程直接把返回值写入栈,然后退出函数
    MOVL    AX, ret+40(FP)	
    RET			

    # 4.2) 子线程的处理逻辑,设置SP,判断mp和gp,设置mp.procid
    # In child, on new stack.
    #
    # 在子线程中,在new栈上。
    # 新创建的子线程从这里开始,注意一下代码是在子线程中,寄存器也是子线程的
    # 设置CPU栈顶寄存器指向子线程的栈顶,这条指令看起来是多余的?内核应该已经把SP设置好了
    MOVQ    SI, SP   

    # If g or m are nil, skip Go-related setup.
    #
    # 如果 g 或 m 为 nil,跳过 Go-related 设置。
    # m	新创建的m结构体对象的地址,由父线程保存在R13寄存器中的值被复制到了子线程
    CMPQ    R13, $0 # R13 = mp
    JEQ	nog2 # R13 为 0 时跳转
    # g	m.g0的地址,由父线程保存在R9寄存器中的值被复制到了子线程
    CMPQ    R9, $0  # R9 = gp
    JEQ	nog2 # R9 为 0 时跳转

    # Initialize m->procid to Linux tid
    # 
    # 将m->procid初始化为Linux tid。
    MOVL    $SYS_gettid, AX	# 通过gettid()系统调用获取线程ID(tid)
    SYSCALL
    MOVQ    AX, m_procid(R13)   # m.procid = tid
    
    # Set FS to point at m->tls.
    # 
    # 新线程刚刚创建出来,还未设置线程本地存储,即m结构体对象还未与工作线程关联起来,
    # 下面的指令负责设置新线程的TLS,把m对象和工作线程关联起来
    # 这两行代码在go1.18中消失了,原因在于CLONE_SETTLS配合参数和R8寄存器在clone中被设置了
    # LEAQ  m_tls(R13), DI  # 取m.tls字段的地址	
    # CALL  runtime·settls(SB)

    # In child, set up new stack
    get_tls(CX)	# CX=&m.tls[1]; CX=TLS
    MOVQ    R13, g_m(R9) # g0.m = m
    MOVQ    R9, g(CX)    # m.tls[0]=&g0
    # R14=&g0 R14寄存器主要存储当前正在运行的goroutine
    MOVQ    R9, R14 # set g register 
    CALL    runtime·stackcheck(SB)  # 检查 SP 是否在 [g->stack.lo, g->stack.hi) 范围内
    
nog2:
    # Call fn. This is the PC of an ABI0 function.
    # 
    # 调用mstart()函数开始调度循环
    CALL    R12	# 永不返回

    # It shouldn't return. If it does, exit that thread.
    MOVL    $111, DI
    MOVL    $SYS_exit, AX
    SYSCALL
    JMP	-3(PC)  // keep exiting

stackcheck()

  1. 检查SP是否在[g->stack.lo, g->stack.hi)范围内。
  2. 文件位置:go1.19.3/src/runtime/sys_linux_amd64.s
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
# check that SP is in range [g->stack.lo, g->stack.hi)
TEXT runtime·stackcheck(SB), NOSPLIT, $0-0
    get_tls(CX)         # CX=TLS
    MOVQ    g(CX), AX   # AX=g0
    # g0.stack.hi 与 SP 比较
    CMPQ    (g_stack+stack_hi)(AX), SP
    JHI	2(PC)
    CALL    runtime·abort(SB)
    # g0.stack.lo 与 SP 比较
    CMPQ    SP, (g_stack+stack_lo)(AX)
    JHI	2(PC)
    CALL    runtime·abort(SB)
    RET