V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
supuwoerc
V2EX  ›  Go 编程语言

关于 Go 并发获取数据的问题,请教大家

  •  
  •   supuwoerc · 4 天前 · 796 次点击

    晚上看了会 Medium 上的博客,发现了一篇关于使用 Go 来实现并发的获取数据。

    我在作者原有的基础上改造了一下,限制了最大的数量,不知道有没有什么疏漏,请大家指教指教。

    另外我也在思考关于 GC 的问题,如果代码中在某一次循环中走了<-ctx.Done()的分支,直接返回了结果,productsChan 会被怎么回收?我应该如何关闭相关的 channel ?还是让程序自己处理?

    Medium 原文地址

    我的改造:

    package main
    
    import "sync"
    import "runtime"
    import "fmt"
    
    var LIST_PRODUCT_TYPE = [100000]string{"food", "electronics", "clothing","...more"} // ......非常多的数据需要查询
    
    type GetListProductResponse struct {
        Data []ProductListResponse `json:"data"`
    }
    
    type ProductListResponse struct {
        Code         string `json:"code"`
        Name         string `json:"name"`
        Price        string `json:"price"`
        Status       bool   `json:"status"`
    }
    
    func getProducts(ctx context.Context, req *GetProductListRequest) (*GetListProductResponse, error) {
        // calling endpoint 3rd party
        // parse to response
        // and return the data
        return &productList, nil
    }
    
    func main() {
        ctx, cancelFunc := context.WithCancel(ctx)
    	defer cancelFunc()
        wg := sync.WaitGroup{}
        doneChan := make(chan struct{}, 1)
        productsChan := make(chan *GetListProductResponse)
        errChan := make(chan error)
        // LIST_PRODUCT_TYPE 数量非常大,需要限制最大的并发数量
        maxConcurrency := 5
        semaphore := make(chan struct{}, maxConcurrency)
        wg.Add(len(LIST_PRODUCT_TYPE))
        for key := range LIST_PRODUCT_TYPE {
            req := &GetProductListRequest{
                ProductType: LIST_PRODUCT_TYPE[key],
            }
            select {
            case <-ctx.Done():
                return nil, ctx.Err()
            case semaphore<-struct{}{}:
                go func() {
                    defer wg.Done()
                    defer func() { <-semaphore }()
                    products, err := getProductList(ctx, req)
                    if err != nil {
                        errChan <- err
                        return
                    }
                    productsChan <- products
                }()
            }
         }
    
         go func() {
             wg.Wait()
             doneChan <- struct{}{}
         }()
    
         var (
             catalogues GetListProductResponse
             data       []ProductListResponse
         )
     
         for {
             select {
             case <-ctx.Done():
                 return nil, ctx.Err()
             case err := <-errChan:
                 return nil, err
             case products := <-productsChan:
                 data = append(data, products.Data...)
                 catalogues.Data = data
             case <-doneChan:
             return &catalogues, nil
         }
       }
    }
    
    第 1 条附言  ·  4 天前
    换了一下写法,调整了优先级
    ```go
    for key := range LIST_PRODUCT_TYPE {
    req := &GetProductListRequest{
    ProductType: LIST_PRODUCT_TYPE[key],
    }
    wg.Add(1)
    select {
    case <-ctx.Done():
    return nil, ctx.Err()
    default:
    semaphore<-struct{}{}
    go func() {
    defer wg.Done()
    defer func() { <-semaphore }()
    products, err := getProductList(ctx, req)
    if err != nil {
    errChan <- err
    return
    }
    productsChan <- products
    }()
    }
    }
    ```
    3 条回复    2025-03-28 19:53:46 +08:00
    dcalsky
        1
    dcalsky  
       4 天前   ❤️ 2
    新手自己写容易出问题,推荐: https://github.com/sourcegraph/conc
    supuwoerc
        2
    supuwoerc  
    OP
       4 天前
    @dcalsky 感谢,我学习下这个库的实现
    awanganddong
        3
    awanganddong  
       2 天前
    ```

    func worker(wg *sync.WaitGroup, jobs chan int, i int) {
    defer wg.Done()
    for job := range jobs {
    fmt.Printf("jobs:%v,goroutine:%v\n", job, i)
    }
    }
    func main() {
    numWorker := 3
    numJobs := 1000
    var wg sync.WaitGroup
    jobs := make(chan int)
    for i := 0; i < numWorker; i++ {
    wg.Add(1)
    go worker(&wg, jobs, i)
    }
    for j := 1; j <= numJobs; j++ {
    jobs <- j
    }
    close(jobs)
    wg.Wait()
    }
    我能想到的就是执行任务这里做并发控制。
    ```
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5838 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 02:24 · PVG 10:24 · LAX 19:24 · JFK 22:24
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.