Golang 的 sync 包
在Go语言(Golang)中,sync 包用于在多个 Goroutine 之间同步操作。
为避免协程并发对共享数据的操作出现的竞争,导致数据错误,所以需要使用到 sync 。
sync 包简介
sync 包包含以下主要类型:
WaitGroupMutexRWMutexPoolOnceCond
以下将逐一介绍这些类型的用法和示例。
WaitGroup
简介
WaitGroup 用于等待一组 Goroutine 完成。WaitGroup 通过计数器跟踪 Goroutine 的数量。
wg.Add():增加计数wg.Done():减少计数wg.Wait():阻塞,直到计数归0
用法
// WaitGroup
func wgWorker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 完成一个任务,计数器-1
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second) // 模拟耗时操作
fmt.Printf("Worker %d done\n", id)
}
// 测试 WaitGroup
func TestWaitGroup(t *testing.T) {
wg := &sync.WaitGroup{}
for i := 1; i <= 3; i++ {
wg.Add(1) // 新增一个任务,计数器+1
go wgWorker(i, wg)
}
wg.Wait()
}
输出如下,可见全部任务执行完毕:
Worker 3 starting
Worker 2 starting
Worker 1 starting
Worker 1 done
Worker 3 done
Worker 2 done
Mutex 互斥锁
简介
Mutex 是一个互斥锁,用于在 Goroutine 之间保护共享资源。
只有一个 Goroutine 可以持有 Mutex 锁,其他 Goroutine 必须等待该锁释放。
特点
-
独占:
- 互斥锁只能被一个 goroutine 持有,其他 goroutine 必须等待锁释放后才能继续执行。
-
避免竞态条件:
- 互斥锁能防止多个 goroutine 同时修改共享资源,从而避免数据竞争。
-
性能开销:
- Mutex 在高并发场景下可能会成为性能瓶颈,特别是在读操作频繁的场景下。
- 因为每次读写操作都需要独占锁,这可能导致较高的锁竞争。
-
死锁风险:
- 死锁通常发生在多个线程之间互相等待对方释放锁的情况。
正确的使用姿势,避免死锁风险:
使用 defer 语句可以自动解锁,即使在函数中发生异常或提前返回,锁也会被正确释放。mutex.Lock() defer mutex.Unlock()
用法
mu.Lock(): 上锁mu.Unlock(): 解锁mu.TryLock(): 尝试上锁,如果锁被其他Goroutine持有,则返回false,否则返回true
一个简单的计数,如果没有使用互斥锁,那么count的结果将不固定。
// Mutex 互斥锁
func TestMutex(t *testing.T) {
var (
wg sync.WaitGroup
mu sync.Mutex
sum int
)
for i := 0; i < 1000; i++ {
wg.Add(1) // 计数+1
go func() {
defer wg.Done() // 计数-1
mu.Lock() // 上锁
defer mu.Unlock() // 解锁
sum++
}()
}
wg.Wait() // 阻塞等待计数归零
fmt.Println(sum) // 1000
}
测试耗时
// 互斥锁 测试耗时
func TestMutexCost(t *testing.T) {
var (
wg sync.WaitGroup
mu sync.Mutex
sum int
readSum int
)
// 记录耗时
now := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1) // 计数+1
go func() {
defer wg.Done() // 计数-1
mu.Lock() // 上锁
defer mu.Unlock() // 解锁
time.Sleep(time.Millisecond) // 模拟耗时
sum++ // 写sum
}()
}
for i := 0; i < 1000000; i++ {
wg.Add(1) // 计数+1
go func() {
defer wg.Done() // 计数-1
mu.Lock() // 上锁
defer mu.Unlock() // 解锁
readSum = sum // 读
}()
}
wg.Wait() // 阻塞等待计数归零
fmt.Println("耗时", time.Since(now)) // 2.018157625s
fmt.Println(sum, readSum) // 1000
}
在我的设备上,这段程序的耗时基本在 2.018157625s 左右。
思考
- 读数据时,是否需要加锁?
- 不一致的读取:
在没有锁的情况下,readData 可能会在 sharedData 被写入到一半时读取数据,这样读取到的值可能是错误的或不完整的。 - 难以调试的问题:
由于读取操作和写入操作是并发进行的,数据竞争可能不会每次都发生,但它会在某些特定情况下导致随机和不稳定的行为,难以复现和调试。 - 崩溃或未定义行为:
在极端情况下,内存模型被破坏,可能导致程序崩溃或产生未定义的行为。
- 不一致的读取:
RWMutex 读写互斥锁
简介
读写锁 RWMutex 是一种同步机制,它允许并发的读操作,但写操作是独占的。
读写锁的特点使其非常适合于那些读操作远多于写操作的场景。
rw.Lock()获取写锁,直到写锁被释放。rw.Unlock()释放写锁。rw.TryLock()尝试获取写锁,如果获取失败则返回false。rw.RLock()获取读锁。rw.RUnlock()释放读锁。rw.TryRLock()尝试获取读锁,如果获取失败则返回false。
特点
- 允许多个读操作并发。
- 写操作独占。
- 避免竞态条件。
- 死锁风险:
不正确使用也会发生死锁。解锁使用defer是一种好的习惯。 - 更高的并发性能
对比互斥锁,读写互斥锁能够提升并发性能,因为读操作可以并行执行,而互斥锁则需要等待写操作完成才能继续执行。
用法 & 测试耗时
顺便测试一下耗时
// 读写互斥锁 , 测试耗时
func TestRWMutexCost(t *testing.T) {
var (
wg sync.WaitGroup
rw sync.RWMutex
sum int
readSum int
)
// 当前时间
now := time.Now()
// 循环
for i := 0; i < 1000; i++ {
wg.Add(1) // 计数+1
go func() {
defer wg.Done() // 计数-1
rw.Lock() // 上锁
defer rw.Unlock() // 解锁
time.Sleep(time.Millisecond) // 模拟耗时
sum++ // 写sum
}()
}
for i := 0; i < 1000000; i++ {
wg.Add(1) // 计数+1
go func() {
defer wg.Done() // 计数-1
rw.RLock() // 上读锁
defer rw.RUnlock() // 解读锁
readSum = sum // 读sum
}()
}
wg.Wait() // 阻塞等待计数归零
fmt.Println("耗时", time.Since(now)) // 1.406641542s
fmt.Println(sum, readSum) //
}
在我的设备上,这段程序的耗时基本在 1.406641542s 左右。
对比 Mutex 的耗时,可见在 读多写少 时 RWMutex 的性能比 Mutex 更好。
Once
简介
Once 确保一个函数只执行一次。无论多少个 Goroutine 调用 Do 方法,指定的函数只会被执行一次。
once.Do():执行操作
用法
var once sync.Once
func initialize() {
fmt.Println("文章来源:fanfine.cn")
}
func TestOnce(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
once.Do(initialize)
wg.Done()
}()
}
wg.Wait()
}
最终只会执行一次。
场景
- mysql、redis、mongo等数据库连接
- 只需执行一次的初始化操作
Pool
简介
sync.Pool 维护一个对象池,当不再需要某个对象时,你可以将其放入池中,供后续使用时取出。
这样做的好处是减少了内存分配和垃圾回收的开销,提高了程序的性能。
pool.New存储初始化对象的函数,当池中没有对象可以用该函数创建新的对象。pool.Get()从对象池中取出对象pool.Put()放入对象到对象池中
使用方法:
有个Struct
首先我们有一个 struct , 比如是常用的 Context
并为其实现 new 、 reset 。
// pool
type Context struct {
*gin.Context
// 其他字段
Domain string
}
func newContext() *Context {
return &Context{}
}
// 将上下文重置
func (c *Context) reset() {
c.Context = nil
// 其他字段重置
c.Domain = ""
}
初始化一个全局变量 pool
初始化一个全局变量 pool,并且设置其 New 函数。
/*
上下文的池 全局变量
在 New() 的函数中,可以把一些通用的变量初始化,
这样,在 Get() 的时候,就不需要再初始化,直接从池中获取。
*/
var pool = sync.Pool{
New: func() interface{} {
ctx := newContext()
// 数据库连接等其他字段赋值
ctx.Domain = "fanfine.cn"
return ctx
},
}
实现 PutContext()
对于从sync.Pool中获取的对象,你应该确保它们的状态在每次放入池中之前被重置,
这样下次取出时对象是干净的并且可以安全地重用。
sync.Pool是线程安全的,多个goroutine可以同时从同一个sync.Pool实例中获取和放回对象。
// 放回池
func PutContext(c *Context) {
// 重置上下文
c.reset()
// 放回池中
pool.Put(c)
}
实现 GetContext()
Get 从池中获取一个对象,并将上下文赋值给对象。
sync.Pool的New方法用于指定如何创建新对象。
这个方法返回一个 func() interface{}类型的函数,
在有了泛型后,就变成了 func() any,
当从池中取对象而池为空时,会调用这个函数来创建新对象。
// 获取一个上下文
func GetContext(c *gin.Context) *Context {
ctx := pool.Get().(*Context)
ctx.Context = c
return ctx
}
在业务中使用
func TestPool(t *testing.T) {
ctx := GetContext(nil)
// 假装执行很多操作。。。。。
time.Sleep(time.Second * 1)
// 放回池中
PutContext(ctx)
}
使用场景
在实际应用中,sync.Pool常用于管理那些创建成本较高,但使用频率又很高的对象,如 上下文。
Cond
简介
sync.Cond(条件变量)是一个同步原语,用于实现更复杂的线程间同步机制。
它通常与sync.Mutex或sync.RWMutex一起使用,以协调多个goroutine之间的执行。
-
sync.Cond的主要作用是:
- 阻塞goroutine:当某个条件不满足时,可以让一个或多个goroutine进入等待状态,直到条件被满足。
- 唤醒goroutine:当条件满足时,可以唤醒一个或所有等待中的goroutine继续执行。
用法
广播 Broadcast
cond.Broadcast()
断点观测运行:

// 多个携程等待cond的广播
func TestCondBroadcast(t *testing.T) {
var (
wg = sync.WaitGroup{}
cond = sync.NewCond(&sync.Mutex{})
sum = 0
)
// 3个协程接收广播
for i := 0; i < 3000; i++ {
wg.Add(1) // wg计数 +1
go func() {
defer wg.Done() // 完成后wg计数 -1
cond.L.Lock() // 获取锁,会在 cond.Wait() 中被释放
defer func() {
cond.L.Unlock() // 释放锁, 释放的是在 cond.Wait() 中获取的锁
}()
cond.Wait() // 等待信号
// 执行业务逻辑
sum++
}()
}
time.Sleep(time.Second * 1) // 避免太快进行到广播
go func() {
cond.Broadcast()
}()
wg.Wait()
fmt.Println("sum =", sum) // sum = 3000
}
信号 Signal
cond.Signal()
可以只让一个协程执行,其他协程继续等待。
func TestCondSingal(t *testing.T) {
var (
cond = sync.NewCond(&sync.Mutex{})
wait = true
sum = 0
)
// 3个协程接收广播
for i := 0; i < 999999; i++ {
go func() {
cond.L.Lock() // 获取锁,会在 cond.Wait() 中被释放
defer func() {
cond.L.Unlock() // 释放锁, 释放的是在 cond.Wait() 中获取的锁
}()
for wait {
cond.Wait()
}
// 执行业务逻辑
sum++
fmt.Println("执行了一次")
}()
}
go func() {
/*
此处加sleep 是为了确保上述循环中的协程已经全部执行了 cond.Wait() ,
否则会直接执行业务代码,自然要避免这个事情。
*/
time.Sleep(time.Second * 1)
/*
此处的 wait 为 false 的必要性:
如果wait为true,在cond.Wait()中,接收到信号,停止阻塞并推出,
此时 wait 为 true,会再次进入 cond.Wait(),并阻塞,等待下一次信号
运行不到sum++
*/
wait = false
cond.Signal()
}()
// 等待,确保goroutine 执行完毕
time.Sleep(time.Second * 2)
fmt.Println("sum =", sum)
}
思考:
wait变量的存在是否有必要呢?
因为 cond.Wait()的源码中有提出使用条件:// c.L.Lock()
// for !condition() {
// c.Wait()
// }
一些相关的教程中,也会使用相关代码。
在我看来没有必要,理由如下:
-
增加了代码出风险的可能性
比如忘记修改wait的值,导致程序无法继续往下执行。详情见代码中的注释 ”此处的 wait 为 false 的必要性“ 处
而不存在wait的控制的话,一定会执行到cond.Wait(), 然后等待信号,然后退出。
所以不会在没有信号的情况下就执行业务逻辑。 -
是否存在过期所以需要循环刷新?
同时我了解了下,cond.Wait()内部的核心机制不会超时或过期,而是会一直等待信号,所以也没有让其不断循环的必要。
使其循环便是徒增功耗了。
- 可以使用cond的机制,可以作为简单的消息队列。

