晚上看了会 Medium 上的博客,发现了一篇关于使用 Go 来实现并发的获取数据。
我在作者原有的基础上改造了一下,限制了最大的数量,不知道有没有什么疏漏,请大家指教指教。
另外我也在思考关于 GC 的问题,如果代码中在某一次循环中走了<-ctx.Done()的分支,直接返回了结果,productsChan 会被怎么回收?我应该如何关闭相关的 channel ?还是让程序自己处理?
我的改造:
package main
import "sync"
import "runtime"
import "fmt"
var LIST_PRODUCT_TYPE = [100000]string{"food", "electronics", "clothing","...more"} // ......非常多的数据需要查询
type GetListProductResponse struct {
Data []ProductListResponse `json:"data"`
}
type ProductListResponse struct {
Code string `json:"code"`
Name string `json:"name"`
Price string `json:"price"`
Status bool `json:"status"`
}
func getProducts(ctx context.Context, req *GetProductListRequest) (*GetListProductResponse, error) {
// calling endpoint 3rd party
// parse to response
// and return the data
return &productList, nil
}
func main() {
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
wg := sync.WaitGroup{}
doneChan := make(chan struct{}, 1)
productsChan := make(chan *GetListProductResponse)
errChan := make(chan error)
// LIST_PRODUCT_TYPE 数量非常大,需要限制最大的并发数量
maxConcurrency := 5
semaphore := make(chan struct{}, maxConcurrency)
wg.Add(len(LIST_PRODUCT_TYPE))
for key := range LIST_PRODUCT_TYPE {
req := &GetProductListRequest{
ProductType: LIST_PRODUCT_TYPE[key],
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case semaphore<-struct{}{}:
go func() {
defer wg.Done()
defer func() { <-semaphore }()
products, err := getProductList(ctx, req)
if err != nil {
errChan <- err
return
}
productsChan <- products
}()
}
}
go func() {
wg.Wait()
doneChan <- struct{}{}
}()
var (
catalogues GetListProductResponse
data []ProductListResponse
)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errChan:
return nil, err
case products := <-productsChan:
data = append(data, products.Data...)
catalogues.Data = data
case <-doneChan:
return &catalogues, nil
}
}
}
![]() |
1
dcalsky 4 天前 ![]() 新手自己写容易出问题,推荐: https://github.com/sourcegraph/conc
|
3
awanganddong 2 天前
```
func worker(wg *sync.WaitGroup, jobs chan int, i int) { defer wg.Done() for job := range jobs { fmt.Printf("jobs:%v,goroutine:%v\n", job, i) } } func main() { numWorker := 3 numJobs := 1000 var wg sync.WaitGroup jobs := make(chan int) for i := 0; i < numWorker; i++ { wg.Add(1) go worker(&wg, jobs, i) } for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) wg.Wait() } 我能想到的就是执行任务这里做并发控制。 ``` |