看过了一下 star 比较高的协程池实现,还有字节开源的实现,完全是 java/c++之类的外行实现思路
协程/线程池,最基本的元件 就是 队列 + 协程/线程,M:N 模型
这两个组件在 go 里边天生就有啊,为什么再搞一套 task queue 呢?
控制队列容量:make(chan, cap) 第二参数就可以
想要控制协程/线程数量,再辅助一个 chan 就可以了,
代码实现如下,100 行搞定:
我把它放到 github 上 gopool 喜欢的老铁可以给个 star
// GoPool is a minimalistic goroutine pool that provides a pure Go implementation
type GoPool struct {
noCopy
queueLen atomic.Int32
doTaskN atomic.Int32
workerN atomic.Int32
options Options
workerSem chan struct{}
queue chan func()
}
// NewGoPool provite fixed number of goroutines, reusable. M:N model
//
// M: the number of reusable goroutines,
// N: the capacity for asynchronous task queue.
func NewGoPool(opts ...Option) *GoPool {
opt := setOptions(opts...)
if opt.minWorkers <= 0 {
panic("GoPool: min workers <= 0")
}
if opt.minWorkers > opt.maxWorkers {
panic("GoPool: min workers > max workers")
}
p := &GoPool{
options: opt,
workerSem: make(chan struct{}, opt.maxWorkers),
queue: make(chan func(), opt.queueCap),
}
for i := int32(0); i < p.options.minWorkers; i++ { // pre spawn
p.workerSem <- struct{}{}
go p.worker(func() {})
}
go p.shrink()
return p
}
// QueueFree returns (capacity of task-queue - length of task-queue)
func (p *GoPool) QueueFree() int {
return int(p.options.queueCap - p.queueLen.Load())
}
// Workers returns current the number of workers
func (p *GoPool) Workers() int {
return int(p.workerN.Load())
}
// Go submits a task to this pool.
func (p *GoPool) Go(task func()) {
if task == nil {
panic("GoPool: Go task is nil")
}
select {
case p.queue <- task:
p.queueLen.Add(1)
case p.workerSem <- struct{}{}:
go p.worker(task)
}
}
func (p *GoPool) worker(task func()) {
p.workerN.Add(1)
defer func() {
<-p.workerSem
p.workerN.Add(-1)
if e := recover(); e != nil {
if p.options.panicHandler != nil {
p.options.panicHandler(e)
}
}
}()
for {
task()
task = <-p.queue
if task == nil {
break
}
p.doTaskN.Add(1)
p.queueLen.Add(-1)
}
}
func (p *GoPool) shrink() {
ticker := time.NewTicker(p.options.shrinkPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
doTaskN := p.doTaskN.Load()
p.doTaskN.Store(0)
if doTaskN < p.options.tasksBelowN {
closeN := p.workerN.Load() - p.options.minWorkers
for closeN > 0 {
p.queue <- nil
closeN--
}
}
}
}
}
1
csh010101 2023-09-12 18:47:52 +08:00
好好好
|
2
Nazz 2023-09-12 19:16:27 +08:00 1
应该是这样的:
```go type channel chan struct{} func (c channel) add() { c <- struct{}{} } func (c channel) done() { <-c } func (c channel) Go(f func()) { c.add() go func() { f() c.done() }() } ``` |
3
lan171 2023-09-12 19:25:55 +08:00
都协程了还要池化?
|
5
lovelylain 2023-09-13 08:24:12 +08:00 via Android
@Nazz 看上去好像也行,OP 代码这么多是完善了哪些方面?
|
6
shaoyie OP @lovelylain 这不算多吧,这已经算是最精简的了吧,我就是因为看到别人实现的太复杂了,所以写了一个,功能就是可以定时收缩,加了几个计数而已
|
8
Nazz 2023-09-13 08:47:56 +08:00 via Android
@lovelylain 最精简的版本,没有之一
|
9
Mohanson 2023-09-13 09:46:44 +08:00 1
XY 问题.
都协程了就没必要池化, 协程模型需要的是速率控制. 用令牌桶算法 10 行代码就能解决问题. 谈起协程就自动类比为进程, 然后想当然认为应该有一个"协程池", "完全是 java/c++之类的外行实现思路" |
10
dyllen 2023-09-13 10:42:52 +08:00
之前看了有的框架有协程池化的组件,看里面的测试结果,除了内存占用优势,性能上没任何优势。
|
11
keakon 2023-09-13 10:54:11 +08:00
|
12
troywinter 2023-09-13 13:49:06 +08:00
一个 semaphore 就能解决的问题,不需要写这么多代码
|
14
shaoyie OP @troywinter 有 sem 不也得有个 job 队列吗
|
16
kneo 2023-09-13 13:57:01 +08:00 via Android
国内大厂自己轮的东西基本都是性能驱动。如果不是为了解决他们自己生产环境里遇见的性能问题,应该不会搞这么个东西出来。
|
17
ZSeptember 2023-09-13 14:05:33 +08:00
学过 Go 的都能写出这个代码,,但是并不一定能写出那些复杂的 pool 。
你这个本质上是一个并发控制而已,最基本的优雅关闭也没处理。 |
19
shaoyie OP @ZSeptember 也不一定,有些人的实现思路就够 gopher ,自己实现一套条件变量 + queue ,我是觉得没必要,go 天生就带这些东西。另外,你说的优雅关闭 是指关闭 pool 吗?我是觉得没必要,本来 go pool 就不是很必须,再加上一个动态的 pool 就更没必要了
|
21
TanKuku 2023-09-13 14:48:35 +08:00 via Android
不懂就问,也没改 runtime 复用什么东西,就单纯控制数量也叫池了吗?
|
22
Pythoner666666 2023-09-13 15:15:27 +08:00
所有的池化技术,无论是 mysql redis 还是 http 的池化技术核心都是复用连接再然后是控制连接的数量不至于把 server 的连接资源都耗尽,私以为你这顶多算是控制并发。但是如果只需要控制并发你这未免也太繁琐了
|
23
shaoyie OP @Pythoner666666 是的用个计数器也可以达到效果,但这样不就是更通用一点嘛,
|
24
huija 2023-09-13 19:00:19 +08:00
官方池化实现不给出来了么,sync.Pool ,如果用不到的,那就说明压根不需要池化(比如协程)
|
25
dyllen 2023-09-14 10:55:06 +08:00
@TanKuku 他这个代码控制了协程数量,也复用了协程,shrink 这个函数就是发消息结束超时生存的协程的,worker 里面没任务会阻塞等待任务来,直到 shrink 发了结束信号退出。
|
26
lysS 2023-09-15 15:13:37 +08:00
为什么要控制协程数量?即使要限制也是限制请求并发
|
27
index90 2023-09-18 18:42:57 +08:00
协程池与最大并发数控制傻傻分不清?
|