Golang 并发编程之 sync 包

Golang 的 sync 包

在Go语言(Golang)中,sync 包用于在多个 Goroutine 之间同步操作。
为避免协程并发对共享数据的操作出现的竞争,导致数据错误,所以需要使用到 sync

sync 包简介

sync 包包含以下主要类型:

  • WaitGroup
  • Mutex
  • RWMutex
  • Pool
  • Once
  • Cond

以下将逐一介绍这些类型的用法和示例。

WaitGroup

简介

WaitGroup 用于等待一组 Goroutine 完成。WaitGroup 通过计数器跟踪 Goroutine 的数量。

  • wg.Add() :增加计数
  • wg.Done():减少计数
  • wg.Wait():阻塞,直到计数归0

用法

// WaitGroup
func wgWorker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 完成一个任务,计数器-1

    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second) // 模拟耗时操作
    fmt.Printf("Worker %d done\n", id)
}

// 测试 WaitGroup
func TestWaitGroup(t *testing.T) {
    wg := &sync.WaitGroup{}
    for i := 1; i <= 3; i++ {
        wg.Add(1) // 新增一个任务,计数器+1
        go wgWorker(i, wg)
    }
    wg.Wait()
}

输出如下,可见全部任务执行完毕:

Worker 3 starting
Worker 2 starting
Worker 1 starting
Worker 1 done
Worker 3 done
Worker 2 done

Mutex 互斥锁

简介

Mutex 是一个互斥锁,用于在 Goroutine 之间保护共享资源。
只有一个 Goroutine 可以持有 Mutex 锁,其他 Goroutine 必须等待该锁释放。

特点

  1. 独占:

    • 互斥锁只能被一个 goroutine 持有,其他 goroutine 必须等待锁释放后才能继续执行。
  2. 避免竞态条件:

    • 互斥锁能防止多个 goroutine 同时修改共享资源,从而避免数据竞争。
  3. 性能开销:

    • Mutex 在高并发场景下可能会成为性能瓶颈,特别是在读操作频繁的场景下。
    • 因为每次读写操作都需要独占锁,这可能导致较高的锁竞争。
  4. 死锁风险:

    • 死锁通常发生在多个线程之间互相等待对方释放锁的情况。

    正确的使用姿势,避免死锁风险:
    使用 defer 语句可以自动解锁,即使在函数中发生异常或提前返回,锁也会被正确释放。

    mutex.Lock()
    defer mutex.Unlock()

用法

  • mu.Lock(): 上锁
  • mu.Unlock(): 解锁
  • mu.TryLock(): 尝试上锁,如果锁被其他Goroutine持有,则返回false,否则返回true

一个简单的计数,如果没有使用互斥锁,那么count的结果将不固定。


// Mutex 互斥锁
func TestMutex(t *testing.T) {
    var (
        wg  sync.WaitGroup
        mu  sync.Mutex
        sum int
    )

    for i := 0; i < 1000; i++ {
        wg.Add(1) // 计数+1
        go func() {
            defer wg.Done()   // 计数-1
            mu.Lock()         // 上锁
            defer mu.Unlock() // 解锁

            sum++
        }()
    }
    wg.Wait()        // 阻塞等待计数归零
    fmt.Println(sum) // 1000
}

测试耗时


// 互斥锁 测试耗时
func TestMutexCost(t *testing.T) {
    var (
        wg      sync.WaitGroup
        mu      sync.Mutex
        sum     int
        readSum int
    )
    // 记录耗时
    now := time.Now()
    for i := 0; i < 1000; i++ {
        wg.Add(1) // 计数+1
        go func() {
            defer wg.Done()              // 计数-1
            mu.Lock()                    // 上锁
            defer mu.Unlock()            // 解锁
            time.Sleep(time.Millisecond) // 模拟耗时
            sum++                        // 写sum
        }()
    }
    for i := 0; i < 1000000; i++ {
        wg.Add(1) // 计数+1
        go func() {
            defer wg.Done()   // 计数-1
            mu.Lock()         // 上锁
            defer mu.Unlock() // 解锁
            readSum = sum     // 读
        }()
    }

    wg.Wait()                          // 阻塞等待计数归零
    fmt.Println("耗时", time.Since(now)) // 2.018157625s
    fmt.Println(sum, readSum)          // 1000
}

在我的设备上,这段程序的耗时基本在 2.018157625s 左右。

思考

  1. 读数据时,是否需要加锁?
    • 不一致的读取:
      在没有锁的情况下,readData 可能会在 sharedData 被写入到一半时读取数据,这样读取到的值可能是错误的或不完整的。
    • 难以调试的问题:
      由于读取操作和写入操作是并发进行的,数据竞争可能不会每次都发生,但它会在某些特定情况下导致随机和不稳定的行为,难以复现和调试。
    • 崩溃或未定义行为:
      在极端情况下,内存模型被破坏,可能导致程序崩溃或产生未定义的行为。

RWMutex 读写互斥锁

简介

读写锁 RWMutex 是一种同步机制,它允许并发的读操作,但写操作是独占的。
读写锁的特点使其非常适合于那些读操作远多于写操作的场景。

  • rw.Lock() 获取写锁,直到写锁被释放。
  • rw.Unlock() 释放写锁。
  • rw.TryLock() 尝试获取写锁,如果获取失败则返回false。
  • rw.RLock() 获取读锁。
  • rw.RUnlock() 释放读锁。
  • rw.TryRLock() 尝试获取读锁,如果获取失败则返回false。

特点

  1. 允许多个读操作并发。
  2. 写操作独占。
  3. 避免竞态条件。
  4. 死锁风险:
    不正确使用也会发生死锁。解锁使用 defer 是一种好的习惯。
  5. 更高的并发性能
    对比互斥锁,读写互斥锁能够提升并发性能,因为读操作可以并行执行,而互斥锁则需要等待写操作完成才能继续执行。

用法 & 测试耗时

顺便测试一下耗时


// 读写互斥锁 , 测试耗时
func TestRWMutexCost(t *testing.T) {
    var (
        wg      sync.WaitGroup
        rw      sync.RWMutex
        sum     int
        readSum int
    )
    // 当前时间
    now := time.Now()
    // 循环
    for i := 0; i < 1000; i++ {
        wg.Add(1) // 计数+1
        go func() {
            defer wg.Done()              // 计数-1
            rw.Lock()                    // 上锁
            defer rw.Unlock()            // 解锁
            time.Sleep(time.Millisecond) // 模拟耗时
            sum++                        // 写sum
        }()
    }
    for i := 0; i < 1000000; i++ {
        wg.Add(1) // 计数+1
        go func() {
            defer wg.Done()    // 计数-1
            rw.RLock()         // 上读锁
            defer rw.RUnlock() // 解读锁
            readSum = sum      // 读sum
        }()
    }
    wg.Wait() // 阻塞等待计数归零
    fmt.Println("耗时", time.Since(now)) // 1.406641542s
    fmt.Println(sum, readSum) //
}

在我的设备上,这段程序的耗时基本在 1.406641542s 左右。
对比 Mutex 的耗时,可见在 读多写少 时 RWMutex 的性能比 Mutex 更好。

Once

简介

Once 确保一个函数只执行一次。无论多少个 Goroutine 调用 Do 方法,指定的函数只会被执行一次。

  • once.Do() :执行操作

用法


var once sync.Once

func initialize() {
    fmt.Println("文章来源:fanfine.cn")
}

func TestOnce(t *testing.T) {
    wg := sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func() {
            once.Do(initialize)
            wg.Done()
        }()
    }
    wg.Wait()
}

最终只会执行一次。

场景

  1. mysql、redis、mongo等数据库连接
  2. 只需执行一次的初始化操作

Pool

简介

sync.Pool 维护一个对象池,当不再需要某个对象时,你可以将其放入池中,供后续使用时取出。
这样做的好处是减少了内存分配和垃圾回收的开销,提高了程序的性能。

  • pool.New 存储初始化对象的函数,当池中没有对象可以用该函数创建新的对象。
  • pool.Get() 从对象池中取出对象
  • pool.Put() 放入对象到对象池中

使用方法:

有个Struct

首先我们有一个 struct , 比如是常用的 Context
并为其实现 new 、 reset 。


// pool
type Context struct {
    *gin.Context
    // 其他字段
    Domain string
}

func newContext() *Context {
    return &Context{}
}

// 将上下文重置
func (c *Context) reset() {
    c.Context = nil
    // 其他字段重置
    c.Domain = ""
}

初始化一个全局变量 pool

初始化一个全局变量 pool,并且设置其 New 函数。


/*
上下文的池 全局变量
在 New() 的函数中,可以把一些通用的变量初始化,
这样,在 Get() 的时候,就不需要再初始化,直接从池中获取。
*/
var pool = sync.Pool{
    New: func() interface{} {
        ctx := newContext()
        // 数据库连接等其他字段赋值
        ctx.Domain = "fanfine.cn"
        return ctx
    },
}

实现 PutContext()

对于从sync.Pool中获取的对象,你应该确保它们的状态在每次放入池中之前被重置,
这样下次取出时对象是干净的并且可以安全地重用。
sync.Pool是线程安全的,多个goroutine可以同时从同一个sync.Pool实例中获取和放回对象。

// 放回池
func PutContext(c *Context) {
    // 重置上下文
    c.reset()
    // 放回池中
    pool.Put(c)
}

实现 GetContext()

Get 从池中获取一个对象,并将上下文赋值给对象。
sync.Pool的New方法用于指定如何创建新对象。
这个方法返回一个 func() interface{}类型的函数,
在有了泛型后,就变成了 func() any,
当从池中取对象而池为空时,会调用这个函数来创建新对象。

// 获取一个上下文
func GetContext(c *gin.Context) *Context {
    ctx := pool.Get().(*Context)
    ctx.Context = c
    return ctx
}

在业务中使用

func TestPool(t *testing.T) {
    ctx := GetContext(nil)
    // 假装执行很多操作。。。。。
    time.Sleep(time.Second * 1)
    // 放回池中
    PutContext(ctx)
}

使用场景

在实际应用中,sync.Pool常用于管理那些创建成本较高,但使用频率又很高的对象,如 上下文

Cond

简介

sync.Cond(条件变量)是一个同步原语,用于实现更复杂的线程间同步机制。
它通常与sync.Mutex或sync.RWMutex一起使用,以协调多个goroutine之间的执行。

  • sync.Cond的主要作用是:

    • 阻塞goroutine:当某个条件不满足时,可以让一个或多个goroutine进入等待状态,直到条件被满足。
    • 唤醒goroutine:当条件满足时,可以唤醒一个或所有等待中的goroutine继续执行。

用法

广播 Broadcast

  • cond.Broadcast()

断点观测运行:

断点动图

// 多个携程等待cond的广播
func TestCondBroadcast(t *testing.T) {
    var (
        wg   = sync.WaitGroup{}
        cond = sync.NewCond(&sync.Mutex{})
        sum  = 0
    )

    // 3个协程接收广播
    for i := 0; i < 3000; i++ {
        wg.Add(1) // wg计数 +1
        go func() {
            defer wg.Done() // 完成后wg计数 -1

            cond.L.Lock() // 获取锁,会在 cond.Wait() 中被释放
            defer func() {
                cond.L.Unlock() // 释放锁, 释放的是在 cond.Wait() 中获取的锁
            }()
            cond.Wait() // 等待信号
            // 执行业务逻辑
            sum++

        }()
    }
    time.Sleep(time.Second * 1) // 避免太快进行到广播
    go func() {
        cond.Broadcast()
    }()
    wg.Wait()
    fmt.Println("sum =", sum) // sum = 3000
}

信号 Signal

  • cond.Signal()

可以只让一个协程执行,其他协程继续等待。


func TestCondSingal(t *testing.T) {
    var (
        cond = sync.NewCond(&sync.Mutex{})
        wait = true
        sum  = 0
    )

    // 3个协程接收广播
    for i := 0; i < 999999; i++ {

        go func() {

            cond.L.Lock() // 获取锁,会在 cond.Wait() 中被释放
            defer func() {
                cond.L.Unlock() // 释放锁, 释放的是在 cond.Wait() 中获取的锁
            }()

            for wait {
                cond.Wait()
            }
            // 执行业务逻辑
            sum++
            fmt.Println("执行了一次")
        }()
    }

    go func() {
        /*
             此处加sleep 是为了确保上述循环中的协程已经全部执行了 cond.Wait() ,
            否则会直接执行业务代码,自然要避免这个事情。
        */
        time.Sleep(time.Second * 1)
        /*
            此处的 wait 为 false 的必要性:
            如果wait为true,在cond.Wait()中,接收到信号,停止阻塞并推出,
            此时 wait 为 true,会再次进入 cond.Wait(),并阻塞,等待下一次信号
            运行不到sum++
        */
        wait = false

        cond.Signal()
    }()

    // 等待,确保goroutine 执行完毕
    time.Sleep(time.Second * 2)
    fmt.Println("sum =", sum)
}

思考:

  • wait 变量的存在是否有必要呢?
    因为 cond.Wait()的源码中有提出使用条件:

    // c.L.Lock()
    // for !condition() {
    // c.Wait()
    // }
    一些相关的教程中,也会使用相关代码。

在我看来没有必要,理由如下:

  1. 增加了代码出风险的可能性
    比如忘记修改wait的值,导致程序无法继续往下执行。详情见代码中的注释 ”此处的 wait 为 false 的必要性“ 处
    而不存在wait的控制的话,一定会执行到cond.Wait(), 然后等待信号,然后退出。
    所以不会在没有信号的情况下就执行业务逻辑。

  2. 是否存在过期所以需要循环刷新?
    同时我了解了下, cond.Wait() 内部的核心机制不会超时或过期,而是会一直等待信号,所以也没有让其不断循环的必要。
    使其循环便是徒增功耗了。

  • 可以使用cond的机制,可以作为简单的消息队列。

本文由 上传。


如果您喜欢这篇文章,请点击链接 Golang 并发编程之 sync 包 查看原文。


您也可以直接访问:https://www.fanfine.cn/blog/147

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇