Sync Pool 源码分析

Sync Pool

Posted by WR on November 18, 2022

什么是 Sync Pool ?

​ 是一个临时对象能够独自存储并且获取的数据池,目的是为了缓存已分配但是未使用的数据项以便之后再使用。

特性

  • Pool 是协程并发安全的
  • 任何对象可能会毫无征兆的被自动移除 - 被GC回收
  • 当前 Pool 为空的时候会从其它 Pool steal 一个

适用场景

  • 不适用短的生命周期对象
  • 当多个协程重复创建同一个对象的时候,可以通过 sync.pool 减少对象的创建,减少 GC 压力

案例

​ 接下来看一个 Sync Pool 的简单使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import "sync"

type People struct {
	Age int
}

func main() {
  // 创建一个对象并放入对象池
	pool := sync.Pool{
		New: func() interface{} {
			return &People{
				Age: 1,
			}
		},
	}

  // 从对象池获取对象
	p1 := pool.Get().(*People)
	println(p1.Age)

  // 对象放回对象池
	pool.Put(p1)
 
  // 从对象池获取刚刚放回的对象
	p2 := pool.Get().(*People)
	println(p2.Age)
}

Sync.Pool 实现原理

​ 由 GMP 模型可知,每个 M 同一时间对应一个 P,也就意味着在 P 上执行的逻辑是单线程的。sync.Pool 就是充分利用了 GMP 这一特点。对于同一个 sync.Pool ,每个 P 都有一个自己的本地对象池 poolLocal

结构

Pool 结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// A Pool must not be copied after first use.
type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
	localSize uintptr        // size of the local array

	victim     unsafe.Pointer // local from previous cycle
	victimSize uintptr        // size of victims array

	// New optionally specifies a function to generate
	// a value when Get would otherwise return nil.
	// It may not be changed concurrently with calls to Get.
	New func() interface{}
}
noCopy

​ NoCopy embed this type into a struct, which mustn’t be copied, so go vet gives a warning if this struct is copied.

​ NoCopy 嵌入到一个结构体里面,表明这个结构体不可被复制,如果这个结构体被复制,可以使用 go vet 在编译和运行阶段发出一个警告。

local

​ local 是一个数组,长度为 P(GMP 模型里面的P) 的个数。元素实际类型是 [p]poolLocal,存储着每个 P 对应的对象池。

localSize

​ 代表 local 的大小,因为可以在运行时通过 runtime.GOMAXPROCS 调节 P 的大小,为此需要通过 localSize 获取 local 数组的大小。

victim

​ 上一批被淘汰的对象池。

victimSize

​ 上一批被淘汰的对象池大小。

New

​ 用户提供用于创建对象的函数(非必填字段),Get 返回对应函数创建的对象,没有设置创建函数则返回 nil。

poolLocal 结构
1
2
3
4
5
6
7
8
9
10
11
12
13
type poolLocal struct {
	poolLocalInternal

	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
	private interface{} // Can be used only by the respective P.
	shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
}

​ poolLocal 包含 poolLocalInternal 和 pad,pad 是用于 CPU cache line 对齐,主要用于提高CPU 缓存命中率,避免被频繁对象被频繁淘汰。关于 cache line 可以看一下 Wikipedia 的说明 。

poolLocal 有一个 pad 属性,从这个属性的定义方式来看,明显是为了凑齐了 128 Byte 的整数倍。为什么会这么做呢?

这里是为了避免 CPU Cache 的 false sharing 问题:CPU Cache Line 通常是以 64 byte 或 128 byte 为单位。在我们的场景中,各个 P 的 poolLocal 是以数组形式存在一起。假设 CPU Cache Line 为 128 byte,而 poolLocal 不足 128 byte 时,那 cacheline 将会带上其他 P 的 poolLocal 的内存数据,以凑齐一整个 Cache Line。如果这时,两个相邻的 P 同时在两个不同的 CPU 核上运行,将会同时去覆盖刷新 CacheLine,造成 Cacheline 的反复失效,那 CPU Cache 将失去了作用。

CPU Cache 是距离 CPU 最近的 cache,如果能将其运用好,会极大提升程序性能。Golang 这里为了防止出现 false sharing 问题,主动使用 pad 的方式凑齐 128 个 byte 的整数倍,这样就不会和其他 P 的 poolLocal 共享一套 CacheLine。

​ poolLocalInternal 包含两个对象:

​ private:私有对象,如果私有对象有值,Get 和 Put 都会优先存取 private 变量,如果 private 变量可以满足,则不再进行其它操作。

​ shared:类型为 poolChain,是 P 的对象池,是个链表结构,当前 P 可以从链表头进行 Push 和 Pop,其它 P 可从链表尾部进行 Pop,这样主要是为了减少锁竞争。

​ 接下来看一下 poolChain 的机构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// poolChain is a dynamically-sized version of poolDequeue.
//
// This is implemented as a doubly-linked list queue of poolDequeues
// where each dequeue is double the size of the previous one. Once a
// dequeue fills up, this allocates a new one and only ever pushes to
// the latest dequeue. Pops happen from the other end of the list and
// once a dequeue is exhausted, it gets removed from the list.
type poolChain struct {
	// head is the poolDequeue to push to. This is only accessed
	// by the producer, so doesn't need to be synchronized.
	head *poolChainElt

	// tail is the poolDequeue to popTail from. This is accessed
	// by consumers, so reads and writes must be atomic.
	tail *poolChainElt
}

​ poolChain 的头和尾分别指向 poolChainElt,head 只能被 当前 P 自己访问,即生产者往链表头存数据,而链表尾 tail,可被多个 P 访问,即其它 P 来偷取数据的场景,所以需要加锁。

​ poolChainElt 结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type poolChainElt struct {
	poolDequeue

	// next and prev link to the adjacent poolChainElts in this
	// poolChain.
	//
	// next is written atomically by the producer and read
	// atomically by the consumer. It only transitions from nil to
	// non-nil.
	//
	// prev is written atomically by the consumer and read
	// atomically by the producer. It only transitions from
	// non-nil to nil.
	next, prev *poolChainElt
}

​ poolChainElt 是个双端来链表,对应的对象元素是 poolDequeue,poolDequeue结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type poolDequeue struct {
	// headTail packs together a 32-bit head index and a 32-bit
	// tail index. Both are indexes into vals modulo len(vals)-1.
	//
	// tail = index of oldest data in queue
	// head = index of next slot to fill
	//
	// Slots in the range [tail, head) are owned by consumers.
	// A consumer continues to own a slot outside this range until
	// it nils the slot, at which point ownership passes to the
	// producer.
	//
	// The head index is stored in the most-significant bits so
	// that we can atomically add to it and the overflow is
	// harmless.
	headTail uint64

	// vals is a ring buffer of interface{} values stored in this
	// dequeue. The size of this must be a power of 2.
	//
	// vals[i].typ is nil if the slot is empty and non-nil
	// otherwise. A slot is still in use until *both* the tail
	// index has moved beyond it and typ has been set to nil. This
	// is set to nil atomically by the consumer and read
	// atomically by the producer.
	vals []eface
}

​ 通过官方注释可知, poolDequeue 是一个 ring buffer 结构,ring buffer 有如下优点:

  1. 预先分配好内存,且分配的内存项可不断复用。
  2. 由于 ring buffer ==本质上是个数组==,是连续内存结构,非常利于 CPU Cache。在访问poolDequeue 某一项时,其附近的数据项都有可能加载到统一 Cache Line 中,访问速度更快。

​ poolDequeue 作为一个 ring buffer,自然需要记录下其 head 和 tail 的值。但在 poolDequeue 的定义中,head 和 tail 并不是独立的两个变量,只有一个 uint64 的 headTail 变量。

这是因为 headTail 变量将 head 和 tail 打包在了一起:其中高 32 位是 head 变量,低 32 位是 tail 变量。如下图所示:

image

​ HeadTail 打包过程

1
2
3
4
5
func (d *poolDequeue) pack(head, tail uint32) uint64 {
	const mask = 1<<dequeueBits - 1
	return (uint64(head) << dequeueBits) |
		uint64(tail&mask)
}

​ HeadTail 解包过程

1
2
3
4
5
6
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
	const mask = 1<<dequeueBits - 1
	head = uint32((ptrs >> dequeueBits) & mask)
	tail = uint32(ptrs & mask)
	return
}

​ 为什么会有这个复杂的打包操作呢?这个其实是个非常常见的 lock free 优化手段。对于一个 poolDequeue 来说,可能会被多个 P 同时访问,这个时候就会带来并发问题。

​ 例如:当 ring buffer 空间仅剩一个的时候,即 head - tail = 1。 如果多个 P 同时访问 ring buffer,在没有任何并发措施的情况下,两个 P 都可能会拿到对象,这肯定是不符合预期的。

​ 在不引入 Mutex 锁的前提下,sync.Pool 是怎么实现的呢?sync.Pool 利用了 atomic 包中的 CAS 操作。两个 P 都可能会拿到对象,但在最终设置 headTail 的时候,只会有一个 P 调用 CAS 成功,另外一个 CAS 失败。

1
atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2)

Put 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrand()%4 == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	l, _ := p.pin()
	if l.private == nil {
		l.private = x
		x = nil
	}
	if x != nil {
		l.shared.pushHead(x)
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}

​ 从以上代码可以看到,在 Put 函数中首先调用了 pin()pin 函数非常重要,它有三个作用:

  1. 初始化或者重新创建local数组。 当 local 数组为空,或者和当前的 runtime.GOMAXPROCS 不一致时,将触发重新创建 local 数组,以和 P 的个数保持一致。
  2. 取当前 P 对应的本地缓存池 poolLocal。其实代码逻辑很简单,就是从 local 数组中根据索引取元素。
  3. 防止当前 P 被抢占。 这点非常重要。在 Go 1.14 以后,Golang 实现了抢占式调度:一个 goroutine 占用 P 时间过长,将会被调度器强制挂起。如果一个 goroutine 在执行 Put 或者 Get 期间被挂起,有可能下次恢复时,绑定就不是上次的 P 了。那整个过程就会完全乱掉。因此,这里使用了 runtime 包里面的 procPin,暂时不允许 P 被抢占。

​ pin 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int) {
	pid := runtime_procPin()
	// In pinSlow we store to local and then to localSize, here we load in opposite order.
	// Since we've disabled preemption, GC cannot happen in between.
	// Thus here we must observe local at least as large localSize.
	// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	return p.pinSlow()
}

​ 接着,Put 函数会优先设置当前 poolLocal 私有变量 private。如果设置私有变量成功,那么将不会往 shared 缓存池写了。这样操作效率会更高效。

​ 如果私有变量之前已经设置过了,那就只能往当前 P 的本地缓存池 poolChain 里面写了。我们接下来看下,sync.Pool 的每个 P 的内部缓存池 poolChain 是怎么实现的。

在 Put 的时候,会去直接取 poolChain 的链表头元素 HEAD:

  • 如果 HEAD 不存在 ,则新建一个 buffer 长度为 8 的 poolDequeue,并将对象放置在里面。
  • 如果 HEAD 存在,且 buffer 尚未满,则将元素直接放置在 poolDequeue 中。
  • 如果 HEAD 存在,但 buffer 满了,则新建一个新的 poolDequeue,长度为上个 HEAD 的 2 倍。同时,将 poolChain 的 HEAD 指向新的元素。

Put 的过程比较简单,整个过程不需要和其他 P 的 poolLocal 进行交互。

Get 实现

​ 通过 Get 操作,可以从 sync.Pool 中获取一个对象。相比于 Put 函数,Get 的实现更为复杂。不仅涉及到对当前 P 本地对象池的操作,还涉及对其他 P 的本地对象池的对象窃取。其代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface{} {
	if race.Enabled {
		race.Disable()
	}
	l, pid := p.pin()
	x := l.private
	l.private = nil
	if x == nil {
		// Try to pop the head of the local shard. We prefer
		// the head over the tail for temporal locality of
		// reuse.
		x, _ = l.shared.popHead()
		if x == nil {
			x = p.getSlow(pid)
		}
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

​ 其中 pin() 的作用和 private 对象的作用,和 PUT 操作中的一致。

​ 首先,Get 函数会尝试从当前 P 的 本地对象池 poolChain 中获取对象。从当前 P 的 poolChain 中取数据时,是从链表头部开始取数据。 具体来说,先取位于链表头的 poolDequeue,然后从 poolDequeue 的头部开始取数据。

​ 如果从当前 P 的 poolChain 取不到数据,意味着当前 P 的缓存池为空,那么将尝试从其他 P 的缓存池中 窃取对象。这也对应 getSlow 函数的内部实现。

​ 在 getSlow 函数,会将当前 P 的索引值不断递增,逐个尝试从其他 P 的 poolChain 中取数据。==注意,当尝试从其他 P 的 poolChain 中取数据时,是从链表尾部开始取的==。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (p *Pool) getSlow(pid int) interface{} {
	// See the comment in pin regarding ordering of the loads.
	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	locals := p.local                            // load-consume
	// Try to steal one element from other procs.
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// Try the victim cache. We do this after attempting to steal
	// from all primary caches because we want objects in the
	// victim cache to age out if at all possible.
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// Mark the victim cache as empty for future gets don't bother
	// with it.
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}

​ 在对其他 P 的 poolChain 调用 popTail,会先取位于链表尾部的 poolDequeue,然后从 poolDequeue 的尾部开始取数据。如果从这个 poolDequeue 中取不到数据,则意味着该 poolDequeue 为空,则直接从该 poolDequeue 从 poolChain 中移除,同时尝试下一个 poolDequeue。

​ 如果从其他 P 的本地对象池,也拿不到数据。接下来会尝试从 victim 中取数据。上文讲到 victim 是上一轮被清理的对象池, 从 victim 取对象也是 popTail 的方式。

​ 最后,如果所有的缓存池都都没有数据了,这个时候会调用用户设置的 New 函数,创建一个新的对象。 image

​ sync.Pool 在设计的时候,当操作本地的 poolChain 时,无论是 push 还是 pop,都是从头部开始。而当从其他 P 的 poolChain 获取数据,只能从尾部 popTail 取。这样可以尽量减少并发冲突。

对象的清理

​ sync.Pool 没有对外开放对象清理策略和清理接口。我们上面讲到,当窃取其他 P 的对象时,会逐步淘汰已经为空的 poolDequeue。但除此之外,sync.Pool 一定也还有其他的对象清理机制,否则对象池将可能会无限制的膨胀下去,造成内存泄漏。

​ Golang 对 sync.Pool 的清理逻辑非常简单粗暴。首先每个被使用的 sync.Pool,都会在初始化阶段被添加到全局变量 allPools []*Pool 对象中。Golang 的 runtime 将会在 每轮 GC 前,触发调用 poolCleanup 函数,清理 allPools。代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func init() {
	runtime_registerPoolCleanup(poolCleanup)
}

func poolCleanup() {
	// This function is called with the world stopped, at the beginning of a garbage collection.
	// It must not allocate and probably should not call any runtime functions.

	// Because the world is stopped, no pool user can be in a
	// pinned section (in effect, this has all Ps pinned).

	// Drop victim caches from all pools.
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// Move primary cache to victim cache.
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	// The pools with non-empty primary caches now have non-empty
	// victim caches and no pools have primary caches.
	oldPools, allPools = allPools, nil
}

​ 这里需要正式介绍下 sync.Pool 的 victim(牺牲者) 机制,我们在 Get 函数的对象窃取逻辑中也有提到 victim。

​ 在每轮 sync.Pool 的清理中,暂时不会完全清理对象池,而是将其放在 victim 中。等到下一轮清理,才完全清理掉 victim。也就是说,每轮 GC 后 sync.Pool 的对象池都会转移到 victim 中,同时将上一轮的 victim 清空掉。

​ 为什么这么做呢? ​ 这是因为 Golang 为了防止 GC 之后 sync.Pool 被突然清空,对程序性能造成影响。因此先利用 victim 作为过渡,如果在本轮的对象池中实在取不到数据,也可以从 victim 中取,这样程序性能会更加平滑。

​ victim 机制最早用在 CPU Cache 中,详细可以阅读这篇 wiki: Victim_cache

引用

https://en.wikipedia.org/wiki/CPU_cache

https://en.wikipedia.org/wiki/Victim_cache

https://www.cyhone.com/articles/think-in-sync-pool