什么是 Sync Pool ?
是一个临时对象能够独自存储并且获取的数据池,目的是为了缓存已分配但是未使用的数据项以便之后再使用。
特性
- Pool 是协程并发安全的
- 任何对象可能会毫无征兆的被自动移除 - 被GC回收
- 当前 Pool 为空的时候会从其它 Pool steal 一个
适用场景
- 不适用短的生命周期对象
- 当多个协程重复创建同一个对象的时候,可以通过 sync.pool 减少对象的创建,减少 GC 压力
案例
接下来看一个 Sync Pool 的简单使用案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main
import "sync"
type People struct {
Age int
}
func main() {
// 创建一个对象并放入对象池
pool := sync.Pool{
New: func() interface{} {
return &People{
Age: 1,
}
},
}
// 从对象池获取对象
p1 := pool.Get().(*People)
println(p1.Age)
// 对象放回对象池
pool.Put(p1)
// 从对象池获取刚刚放回的对象
p2 := pool.Get().(*People)
println(p2.Age)
}
Sync.Pool 实现原理
由 GMP 模型可知,每个 M 同一时间对应一个 P,也就意味着在 P 上执行的逻辑是单线程的。sync.Pool 就是充分利用了 GMP 这一特点。对于同一个 sync.Pool ,每个 P 都有一个自己的本地对象池 poolLocal
。
结构
Pool 结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// A Pool must not be copied after first use.
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
noCopy
NoCopy embed this type into a struct, which mustn’t be copied, so go vet
gives a warning if this struct is copied.
NoCopy 嵌入到一个结构体里面,表明这个结构体不可被复制,如果这个结构体被复制,可以使用 go vet 在编译和运行阶段发出一个警告。
local
local 是一个数组,长度为 P(GMP 模型里面的P) 的个数。元素实际类型是 [p]poolLocal,存储着每个 P 对应的对象池。
localSize
代表 local 的大小,因为可以在运行时通过 runtime.GOMAXPROCS 调节 P 的大小,为此需要通过 localSize 获取 local 数组的大小。
victim
上一批被淘汰的对象池。
victimSize
上一批被淘汰的对象池大小。
New
用户提供用于创建对象的函数(非必填字段),Get 返回对应函数创建的对象,没有设置创建函数则返回 nil。
poolLocal 结构
1
2
3
4
5
6
7
8
9
10
11
12
13
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
poolLocal 包含 poolLocalInternal 和 pad,pad 是用于 CPU cache line 对齐,主要用于提高CPU 缓存命中率,避免被频繁对象被频繁淘汰。关于 cache line 可以看一下 Wikipedia 的说明 。
poolLocal 有一个 pad 属性,从这个属性的定义方式来看,明显是为了凑齐了 128 Byte 的整数倍。为什么会这么做呢?
这里是为了避免 CPU Cache 的 false sharing 问题:CPU Cache Line 通常是以 64 byte 或 128 byte 为单位。在我们的场景中,各个 P 的 poolLocal 是以数组形式存在一起。假设 CPU Cache Line 为 128 byte,而 poolLocal 不足 128 byte 时,那 cacheline 将会带上其他 P 的 poolLocal 的内存数据,以凑齐一整个 Cache Line。如果这时,两个相邻的 P 同时在两个不同的 CPU 核上运行,将会同时去覆盖刷新 CacheLine,造成 Cacheline 的反复失效,那 CPU Cache 将失去了作用。
CPU Cache 是距离 CPU 最近的 cache,如果能将其运用好,会极大提升程序性能。Golang 这里为了防止出现 false sharing 问题,主动使用 pad 的方式凑齐 128 个 byte 的整数倍,这样就不会和其他 P 的 poolLocal 共享一套 CacheLine。
poolLocalInternal 包含两个对象:
private:私有对象,如果私有对象有值,Get 和 Put 都会优先存取 private 变量,如果 private 变量可以满足,则不再进行其它操作。
shared:类型为 poolChain,是 P 的对象池,是个链表结构,当前 P 可以从链表头进行 Push 和 Pop,其它 P 可从链表尾部进行 Pop,这样主要是为了减少锁竞争。
接下来看一下 poolChain 的机构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// poolChain is a dynamically-sized version of poolDequeue.
//
// This is implemented as a doubly-linked list queue of poolDequeues
// where each dequeue is double the size of the previous one. Once a
// dequeue fills up, this allocates a new one and only ever pushes to
// the latest dequeue. Pops happen from the other end of the list and
// once a dequeue is exhausted, it gets removed from the list.
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}
poolChain 的头和尾分别指向 poolChainElt,head 只能被 当前 P 自己访问,即生产者往链表头存数据,而链表尾 tail,可被多个 P 访问,即其它 P 来偷取数据的场景,所以需要加锁。
poolChainElt 结构如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type poolChainElt struct {
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
}
poolChainElt 是个双端来链表,对应的对象元素是 poolDequeue,poolDequeue结构如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
vals []eface
}
通过官方注释可知, poolDequeue 是一个 ring buffer 结构,ring buffer 有如下优点:
- 预先分配好内存,且分配的内存项可不断复用。
- 由于 ring buffer ==本质上是个数组==,是连续内存结构,非常利于 CPU Cache。在访问poolDequeue 某一项时,其附近的数据项都有可能加载到统一 Cache Line 中,访问速度更快。
poolDequeue 作为一个 ring buffer,自然需要记录下其 head 和 tail 的值。但在 poolDequeue 的定义中,head 和 tail 并不是独立的两个变量,只有一个 uint64 的 headTail 变量。
这是因为 headTail 变量将 head 和 tail 打包在了一起:其中高 32 位是 head 变量,低 32 位是 tail 变量。如下图所示:
HeadTail 打包过程
1
2
3
4
5
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
HeadTail 解包过程
1
2
3
4
5
6
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
为什么会有这个复杂的打包操作呢?这个其实是个非常常见的 lock free 优化手段。对于一个 poolDequeue 来说,可能会被多个 P 同时访问,这个时候就会带来并发问题。
例如:当 ring buffer 空间仅剩一个的时候,即 head - tail = 1
。 如果多个 P 同时访问 ring buffer,在没有任何并发措施的情况下,两个 P 都可能会拿到对象,这肯定是不符合预期的。
在不引入 Mutex 锁的前提下,sync.Pool 是怎么实现的呢?sync.Pool 利用了 atomic 包中的 CAS 操作。两个 P 都可能会拿到对象,但在最终设置 headTail 的时候,只会有一个 P 调用 CAS 成功,另外一个 CAS 失败。
1
atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2)
Put 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
从以上代码可以看到,在 Put 函数中首先调用了 pin()
。pin
函数非常重要,它有三个作用:
- 初始化或者重新创建local数组。 当 local 数组为空,或者和当前的
runtime.GOMAXPROCS
不一致时,将触发重新创建 local 数组,以和 P 的个数保持一致。 - 取当前 P 对应的本地缓存池 poolLocal。其实代码逻辑很简单,就是从 local 数组中根据索引取元素。
- 防止当前 P 被抢占。 这点非常重要。在 Go 1.14 以后,Golang 实现了抢占式调度:一个 goroutine 占用 P 时间过长,将会被调度器强制挂起。如果一个 goroutine 在执行 Put 或者 Get 期间被挂起,有可能下次恢复时,绑定就不是上次的 P 了。那整个过程就会完全乱掉。因此,这里使用了 runtime 包里面的 procPin,暂时不允许 P 被抢占。
pin 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
接着,Put 函数会优先设置当前 poolLocal 私有变量 private
。如果设置私有变量成功,那么将不会往 shared 缓存池写了。这样操作效率会更高效。
如果私有变量之前已经设置过了,那就只能往当前 P 的本地缓存池 poolChain 里面写了。我们接下来看下,sync.Pool 的每个 P 的内部缓存池 poolChain 是怎么实现的。
在 Put 的时候,会去直接取 poolChain 的链表头元素 HEAD:
- 如果 HEAD 不存在 ,则新建一个 buffer 长度为 8 的 poolDequeue,并将对象放置在里面。
- 如果 HEAD 存在,且 buffer 尚未满,则将元素直接放置在 poolDequeue 中。
- 如果 HEAD 存在,但 buffer 满了,则新建一个新的 poolDequeue,长度为上个 HEAD 的 2 倍。同时,将 poolChain 的 HEAD 指向新的元素。
Put 的过程比较简单,整个过程不需要和其他 P 的 poolLocal 进行交互。
Get 实现
通过 Get 操作,可以从 sync.Pool 中获取一个对象。相比于 Put 函数,Get 的实现更为复杂。不仅涉及到对当前 P 本地对象池的操作,还涉及对其他 P 的本地对象池的对象窃取。其代码逻辑如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
其中 pin()
的作用和 private
对象的作用,和 PUT 操作中的一致。
首先,Get 函数会尝试从当前 P 的 本地对象池 poolChain 中获取对象。从当前 P 的 poolChain 中取数据时,是从链表头部开始取数据。 具体来说,先取位于链表头的 poolDequeue,然后从 poolDequeue 的头部开始取数据。
如果从当前 P 的 poolChain 取不到数据,意味着当前 P 的缓存池为空,那么将尝试从其他 P 的缓存池中 窃取对象。这也对应 getSlow 函数的内部实现。
在 getSlow 函数,会将当前 P 的索引值不断递增,逐个尝试从其他 P 的 poolChain 中取数据。==注意,当尝试从其他 P 的 poolChain 中取数据时,是从链表尾部开始取的==。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Mark the victim cache as empty for future gets don't bother
// with it.
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
在对其他 P 的 poolChain 调用 popTail,会先取位于链表尾部的 poolDequeue,然后从 poolDequeue 的尾部开始取数据。如果从这个 poolDequeue 中取不到数据,则意味着该 poolDequeue 为空,则直接从该 poolDequeue 从 poolChain 中移除,同时尝试下一个 poolDequeue。
如果从其他 P 的本地对象池,也拿不到数据。接下来会尝试从 victim 中取数据。上文讲到 victim 是上一轮被清理的对象池, 从 victim 取对象也是 popTail 的方式。
最后,如果所有的缓存池都都没有数据了,这个时候会调用用户设置的 New
函数,创建一个新的对象。
sync.Pool 在设计的时候,当操作本地的 poolChain 时,无论是 push 还是 pop,都是从头部开始。而当从其他 P 的 poolChain 获取数据,只能从尾部 popTail 取。这样可以尽量减少并发冲突。
对象的清理
sync.Pool 没有对外开放对象清理策略和清理接口。我们上面讲到,当窃取其他 P 的对象时,会逐步淘汰已经为空的 poolDequeue。但除此之外,sync.Pool 一定也还有其他的对象清理机制,否则对象池将可能会无限制的膨胀下去,造成内存泄漏。
Golang 对 sync.Pool 的清理逻辑非常简单粗暴。首先每个被使用的 sync.Pool,都会在初始化阶段被添加到全局变量 allPools []*Pool
对象中。Golang 的 runtime 将会在 每轮 GC 前,触发调用 poolCleanup 函数,清理 allPools。代码逻辑如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Because the world is stopped, no pool user can be in a
// pinned section (in effect, this has all Ps pinned).
// Drop victim caches from all pools.
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldPools, allPools = allPools, nil
}
这里需要正式介绍下 sync.Pool 的 victim(牺牲者) 机制,我们在 Get 函数的对象窃取逻辑中也有提到 victim。
在每轮 sync.Pool 的清理中,暂时不会完全清理对象池,而是将其放在 victim 中。等到下一轮清理,才完全清理掉 victim。也就是说,每轮 GC 后 sync.Pool 的对象池都会转移到 victim 中,同时将上一轮的 victim 清空掉。
为什么这么做呢? 这是因为 Golang 为了防止 GC 之后 sync.Pool 被突然清空,对程序性能造成影响。因此先利用 victim 作为过渡,如果在本轮的对象池中实在取不到数据,也可以从 victim 中取,这样程序性能会更加平滑。
victim 机制最早用在 CPU Cache 中,详细可以阅读这篇 wiki: Victim_cache。
引用
https://en.wikipedia.org/wiki/CPU_cache
https://en.wikipedia.org/wiki/Victim_cache
https://www.cyhone.com/articles/think-in-sync-pool