V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
swqslwl
V2EX  ›  Go 编程语言

请教一个并发设计问题

  •  
  •   swqslwl · 355 天前 via Android · 3822 次点击
    这是一个创建于 355 天前的主题,其中的信息可能已经有所发展或是发生改变。
    我在做一个监测流量的项目。每秒会从数据源中获取 1w 条 json 格式的流量信息,我希望对这些流量进行分析,但是现在会出现丢数据的情况。

    我的做法是
    1.接受到数据后先传入 channelA
    2.启动一个协程循环从 A 中读取数据存入切片 B
    3.另起一个协程处理切片 B 的数据,同时在处理业务时利用 mutex 锁住 B

    实际调试中发现,mutex 的次数会影响数据的丢失量
    请问我这样设计是否有问题,是否会导致丢数据
    第 1 条附言  ·  355 天前
    
    var fmutex = sync.Mutex{}
    var A = make(chan string, 1048576)
    var B  = make([]flowStatistic,0)
    
    func foo(){
    	go getData()
    	go txData()
    	go handleData()
    }
    
    //接受数据
    func getData(){
    	for {
    		...
    		data := conn.Read()
    		A<-data
    	}
    }
    
    func txData(){
    	for{
    		var fs flowStatistic
    		err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析A传过来的数据
    		...
    		B = append(B,fs)			//仅仅在这里会插入B
    	}
    }
    
    func handleData(){
    	//这里每5秒钟对B中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住B,处理完后清空B中数据并解锁
    	for{
    		time.Sleep(5 * time.Second)
    		fmutex.Lock()
    		...
    		B = make([]flowStatistic,0)
    		fmutex.Unlock()
    	}
    }
    
    

    有老哥提到了append B的时候应该锁住,我试了下发现实际上还是会丢数据。

    第 2 条附言  ·  355 天前
    var fmutex = sync.Mutex{}
    var A = make(chan string, 1048576)
    var B  = make([]flowStatistic,0)
    
    func foo(){
    	go getData()
    	go txData()
    	go handleData()
    }
    
    //接受数据
    func getData(){
    	for {
    		...
                    addr, err := net.ResolveUnixAddr("unixgram", sock)
    	        conn, err := net.ListenUnixgram("unixgram", addr)
    		data := conn.ReadFromUnix()
    		A<-data
    	}
    }
    
    func txData(){
    	for{
    		var fs flowStatistic
    		err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析A传过来的数据
    		...
                    fmutex.Lock()
    		B = append(B,fs)			
                    fmutex.Unlock()
    	}
    }
    
    func handleData(){
    	//这里每5秒钟对B中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住B,处理完后清空B中数据并解锁
    	for{
    		time.Sleep(5 * time.Second)
    		fmutex.Lock()
    		...
    		B = make([]flowStatistic,0)
    		fmutex.Unlock()
    	}
    }
    

    改进了一下

    39 条回复    2023-05-09 11:03:17 +08:00
    RH
        1
    RH  
       355 天前
    需要 demo 才能分析,你描述的逻辑里有很多不确定性。
    lrh3321
        2
    lrh3321  
       355 天前 via Android
    要 demo, 你两个协程都会写切片 B
    ox180
        3
    ox180  
       355 天前
    @lrh3321 冒泡
    piaodazhu
        4
    piaodazhu  
       355 天前   ❤️ 1
    我就提一个可能,切片 B 扩容,导致这种特殊情况:
    时刻 1 ,goroutine1 加锁,用 B=append(B, item)向切片 B 追加一个元素。刚好触发了扩容,B 的底层数组指针发生了转移。即,append 的参数 B 和返回值 B 中的 ptr 不同。
    时刻 2 紧接着时刻 1 ,goroutine2 拿到锁,这个时候在 goroutine2 看来,B 只是一个由(size,cap,ptr)构成结构体,它察觉不到 B 底层数组指针的变化,所以看不到 goroutine1 追加的数据。

    具体可以检查一下代码。
    xuboying
        5
    xuboying  
       355 天前
    我感觉 sync.Pool 是干这个事情的。但是我一直没有掌握 sync.Pool 的正确用法,希望有大佬解释一下。
    ding2dong
        6
    ding2dong  
       355 天前
    调大 A 的 bufsize
    ding2dong
        7
    ding2dong  
       355 天前
    另外写入 B 的时候也要 mutex ,否则会被污染
    lysS
        8
    lysS  
       355 天前
    “但是现在会出现丢数据的情况” 这是为什么呢?实际没有从数据源中获取到 1w 条?
    liangkang1436
        9
    liangkang1436  
       355 天前 via Android
    有没有考虑用时序数据库来存储这些数据然后订阅? 1w/s ,这个数据量不小了
    fregie
        10
    fregie  
       355 天前 via Android
    2 中存入 b 的过程也要锁。
    其实这里不用切片用队列比较合适
    ghost024
        11
    ghost024  
       355 天前
    没看到你的代码,粗略的分析,你的第一个协程,从 channelA 中写到切片 b 也需要先获得 b 的 mutex 锁的,要不然,如果在锁 b 的时候你从 channelA 中获取数据,因为 b 锁住了,你写不进去就丢了
    WispZhan
        12
    WispZhan  
       355 天前   ❤️ 1
    The Golden Rule - Don't Block the Event Loop or Coroutine.
    Martens
        13
    Martens  
       355 天前
    写切片 B 和读切片 B 的时候都要加锁
    dode
        14
    dode  
       355 天前
    先放 kafka ,再批量读出来处理呢
    8355
        15
    8355  
       355 天前
    @ghost024 #11 +1 跟我理解的一样
    而且整个过程感觉效率并不高
    使用中间件哪怕 redis stream 整个代码都可以简单很多
    joesonw
        16
    joesonw  
       355 天前 via iPhone
    能确定数量就用 channel ,不行的话用 linked list 。尽量避免用锁,传递锁的时候要传指针&。
    matrix1010
        17
    matrix1010  
       355 天前
    丢数据算 bug 吗? 如果算请写个并发的单元测试并加上-race 测一下
    Nazz
        18
    Nazz  
       355 天前
    数据量有点大, 建议使用 sync.Pool + 任务队列
    swqslwl
        19
    swqslwl  
    OP
       355 天前
    @lrh3321
    @RH 老哥代码放上了
    swqslwl
        20
    swqslwl  
    OP
       355 天前
    @ding2dong
    @fregie

    @ghost024
    @Martens
    @8355 对,这里确实是有问题。但是我加上后发现还是会丢
    leonshaw
        21
    leonshaw  
       355 天前
    conn 是什么协议?
    把加放锁和处理数据的位置再标一下。
    rrfeng
        22
    rrfeng  
       355 天前
    channel 里读 N 条出来直接处理掉,不要用切片缓存 /交互数据,就没这个问题了。

    这个切片设计的根本没什么道理。
    pkoukk
        23
    pkoukk  
       355 天前
    没理由 append B 加锁了还能丢数据啊
    你可能丢数据的地方在 err := json.Unmarshal([]byte(<-A), &fs)
    oldshensheep
        24
    oldshensheep  
       355 天前 via Android
    看你最终的代码感觉没什么问题。

    建议写个可以复现的 demo ,之前我也是出 bug ,感觉是用的第三方的库的问题。后来写了个可以复现的 demo ,发现是我代码的问题。

    我有很多莫名其妙的 bug 都是在写 demo 的时候发现代码真正错误的地方。

    比如说你这个代码,里面有网络连接,写数据库啥的,都给简化了,最终就是纯粹的逻辑代码,慢慢调试就发现问题了。
    而且也方便别人运行调试。
    ns09005264
        25
    ns09005264  
       355 天前
    handleData 里加锁处理数据,但是 txData 里 append 却没有加锁,
    所以当 handleData 正在处理数据的时候,txData 还在往里面 append 数据,
    等 handleData 处理完,清空了 B ,txData 在 handleData 处理数据的过程中所添加的数据也就被清除了。
    没有给写入加锁只给读取加锁,等于没加锁。

    另外你想用 handleData 异步处理数据,但是如果在 txData 里给 append 加锁,其实就等于同步处理数据了,没什么意义。考虑在 txData 里对数据进行分块或按时间进行分块,再将分块的数据传给 handleData ,连锁都不用。
    8355
        26
    8355  
       355 天前
    我的理解 handleData 这里完全没必要 也没必要用锁
    可以把写库代码直接放到 appendB = append(B,fs) 位置执行
    其次 db 本身是支持并发写库的,这里加锁意义不大,加了锁也都是在等待锁反而更慢
    leonshaw
        27
    leonshaw  
       355 天前
    检查一下发送端的返回值。
    如上面所说的,这样实现并没有并发。如果处理能力大于上游,同步处理就行;如果小于上游,最终结果就是一个协程在处理,一个在等锁,一个在等 channel 缓冲空间。
    reliefe
        28
    reliefe  
       355 天前
    这个问题根本应该在于多个线程操作同一个切片导致的,这里就会有很大不确定性。我问了 GPT-4 ,它给了很好的建议,把 B 换成 chan 而不是切片试试
    ```
    var A = make(chan string, 1048576)
    var B = make(chan flowStatistic, 1048576) // 使用带缓冲的 channel 而非切片
    ...

    func txData() {
    for {
    var fs flowStatistic
    err := json.Unmarshal([]byte(<-A), &fs)
    ...
    B <- fs // 将 fs 传递给 handleData
    }
    }

    func handleData() {
    var buffer []flowStatistic
    timer := time.NewTimer(5 * time.Second)

    for {
    select {
    case fs := <-B:
    buffer = append(buffer, fs)
    case <-timer.C:
    // 处理 buffer 中的数据
    ...
    buffer = make([]flowStatistic, 0)
    timer.Reset(5 * time.Second)
    }
    }
    }
    ```
    完整回复: https://flowus.cn/share/533684c0-2869-4507-8375-297103f09c77
    PS: 顺便一提在我的小站就可以随时用 GPT-4 了, liaobots.com
    quzard
        29
    quzard  
       355 天前
    ```go
    var fmutex = sync.Mutex{}
    var A = make(chan string, 1048576)
    var B = sync.Pool{
    New: func() interface{} {
    return make([]flowStatistic, 0, 10000)
    },
    }

    func foo(){
    go getData()
    go txData()
    go handleData()
    }

    // 接收数据
    func getData(){
    for {
    // ...
    data := conn.Read()
    A<-data
    }
    }


    func txData() {
    for {
    var fs flowStatistic
    err := json.Unmarshal([]byte(<-A), &fs)
    // ...

    fmutex.Lock()
    currB := B.Get().([]flowStatistic)
    currB = append(currB, fs)
    B.Put(currB)
    fmutex.Unlock()
    }
    }

    func handleData() {
    for {
    time.Sleep(5 * time.Second)
    fmutex.Lock()
    currB := B.Get().([]flowStatistic)

    // 进行数据聚合和存储操作
    // ...
    // 清空 B
    currB = currB[:0]
    B.Put(currB)
    fmutex.Unlock()
    }
    }

    ```
    quzard
        30
    quzard  
       355 天前
    @quzard #29 怎么发送后格式就乱了呢
    Anivial
        31
    Anivial  
       355 天前
    感觉可以换一种思路,通过 time.Ticker 和 select 来代替锁保证缓存数据不会被互相抢占影响
    for {
    select {
    case data := <-A:
    ...
    B = append(B,fs)
    case t := <-ticker.C: // ticker := time.NewTicker(5 * time.Second)
    // 聚合处理数据
    process(B)

    // 清空 B 保留容量
    B = B[:0:cap(B)]
    }
    }
    piaodazhu
        32
    piaodazhu  
       355 天前
    在楼主给的第二份代码其实也没有解决上面我提的那个问题,因为 goroutine1 在等待 goroutine2 放锁的时候,它栈里面的变量 B 就是旧的 B (底层指针不会变成你清空后新赋值的指针),所以 goroutine2 的清空操作 goroutine1 在这一次执行中是不可见的。

    试试这样修改?
    ```
    var fmutex = sync.Mutex{}
    var A = make(chan string, 1048576)
    var B_array = make([]flowStatistic,0) // <------
    var B = &B // <------

    func foo(){
    go getData()
    go txData()
    go handleData()
    }

    //接受数据
    func getData(){
    for {
    ...
    addr, err := net.ResolveUnixAddr("unixgram", sock)
    conn, err := net.ListenUnixgram("unixgram", addr)
    data := conn.ReadFromUnix()
    A<-data
    }
    }

    func txData(){
    for{
    var fs flowStatistic
    err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析 A 传过来的数据
    ...
    fmutex.Lock()
    *B = append(*B,fs) // <------
    fmutex.Unlock()
    }
    }

    func handleData(){
    //这里每 5 秒钟对 B 中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住 B ,处理完后清空 B 中数据并解锁
    for{
    time.Sleep(5 * time.Second)
    fmutex.Lock()
    ...
    *B = make([]flowStatistic,0) // <------
    fmutex.Unlock()
    }
    }
    ```

    感觉大概率是这里的问题
    piaodazhu
        33
    piaodazhu  
       355 天前
    @piaodazhu 不好意思看错了,B 不在栈上,上面这个请忽略。。。

    另外,在 handleData()里面,可以在加锁之后:
    fmutex.Lock()
    tmp := B
    B = make([]flowStatistic, 0)
    fmutex.Unlock()
    ... // processing tmp

    可以减少加锁时间,看不能减少或者消除数据丢失?
    PythonYXY
        34
    PythonYXY  
       355 天前
    数据量也不小了,感觉还是上 Flink 吧,基于滚动窗口+RocksDB 状态后端做实时分析。
    picone
        35
    picone  
       355 天前
    感觉代码没有问题,但是有些能优化的地方,可以改成无锁化
    ```go
    func txData() {
    ticker := time.NewTicker()
    for {
    select {
    case <- ticker.C:
    go func() // report your data
    B = make()
    case evt <- A:
    B = append(B, evet)
    case <-ctx.Done():
    return
    }
    }
    }
    ```
    liuxu
        36
    liuxu  
       355 天前
    第二条附言的代码应该没问题了,golang 所有基础类型都不是线程安全的,txData()在不断自动扩容 B ,而 handleData()拿到的是旧指针,处理完旧指针的数据清空新 B 指针,导致了旧指针和新 B 指针这段时间 append()的数据丢失

    第一个附言等于没锁,handleData()内部没有线程安全问题,是单线程的,竞态出在 txData()的 append()和 handleData()的 B = make([]flowStatistic,0)之间
    ccde8259
        37
    ccde8259  
       355 天前 via iPhone
    这种地方 mutex 写 slice 不如写 chan……
    doraf
        38
    doraf  
       354 天前
    如果还有问题,能不能试试 atomic.Value 来存取 B 。
    txData 和 handleData 之间,能不能使用 chan 来传递 flowStatistic 。
    5 秒处理一次的话,在 txData 缓存数据,每 5 秒调用一次 go handleData 行不行(传递缓存数据给 handleData ),不知道语义还对不对。
    要不要考虑考虑 kafka 、flink 这种。
    xurh
        39
    xurh  
       354 天前
    我之前做爬虫收集数据也遇到过类似的问题,把数据聚合进行批量插入减少 io 。

    我采用的 chan ,然后启动一个协程监听 chan ,当收集一定数量的数据或者时间满足,就把数据写入 db

    ```go

    type DBWriter[T any] struct {
    Size int
    Interval time.Duration
    done chan struct{}
    ch chan T
    insertDB func([]T, int)
    }

    func (w *DBWriter[T]) Start() {
    ticker := time.NewTicker(w.Interval)
    records := make([]T, 0, w.Size)
    insert := func() {
    if len(records) == 0 {
    return
    }
    w.insertDB(records, w.Size)
    records = make([]T, 0, w.Size)
    }

    for {
    select {
    case <-w.done:
    insert()
    return

    case <-ticker.C:
    insert()

    case data := <-w.ch:
    records = append(records, data)

    if len(records) == w.Size {
    insert()
    }
    }
    }
    }

    ```
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   1015 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 19:04 · PVG 03:04 · LAX 12:04 · JFK 15:04
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.