问题是这样, 现在系统中有大量去和第三方 API 交互的任务, 比如有 1000 个用户, 每个用户又有各自 1 万个小的记录去和第三方 API 慢慢交互, 或者没有那么多记录但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络, 之前的方式就是一个线程池, 把所有大小任务塞进去, 但是这个线程池大小很难搞, 多了的话, 有时会突然来一堆任务占住 CPU 和数据库, 少了的话, 一大堆任务又阻塞住.
现在想搞成分布式好几台机器一起跑, 考察了一下方案, 有点迷惑:
所以, 其实就是我想找一个比较现成的框架, 能处理超长的任务队列, 分布式, 并发的执行, 可以自动削峰填谷, 有一些任务自动处理, 比如重试, 故障转移等等, 又能够有一些保证一致性的机制, 比如按 job+某个参数确保不会重复执行, 还能程序方式发起调度, 而不是在某个管理后台手动编辑
我想知道这样的东西存在吗, 还是必须自己实现, 求各位大佬赐教
1
aguesuka 2022-04-26 12:33:50 +08:00
storm
|
2
biubiuF 2022-04-26 12:40:35 +08:00
你需要 kafka ,把你现在的 jobs 弄成消费者
|
3
RedBeanIce 2022-04-26 12:59:38 +08:00
可能是 xxjob ????我记得有分片处理,,,就是一堆小任务,大家去处理
|
4
bthulu 2022-04-26 13:06:09 +08:00
既然时间都消耗在网络 IO 上, 上 windows 系统, 用 IOCP 去调接口, 单机就能搞定了, 用不着搞这么多骚操作
|
5
jorneyr 2022-04-26 13:09:21 +08:00
@biubiuF Kafka 的每个 partition 一个消费者组里同时只能有一个消费者进行消费,这种情况我觉得 RabbitMQ 可能更合适,不必明确的限制消费者个数,看情况随时动态增减消费者,每个消息可以使用阻塞的方式执行。
|
6
Leexiaobu 2022-04-26 14:11:28 +08:00
Akka
|
7
lmshl 2022-04-26 14:15:32 +08:00
改异步纤程,你这才一千万个 IO 小任务,犯不着上分布式。Akka Stream (调度) + Akka HTTP (调 API ) 随便搞一搞单机就完事了
|
8
ming159 2022-04-26 14:16:34 +08:00
如你所说:“其实时间都是消耗在网络 IO 上” 线程是不解决 IO 问题的,你需要的是 异步 IO 处理机制。一个线程同时处理多个 IO ,而不是一个线程处理一个 IO 。
|
9
ymmud 2022-04-26 14:25:24 +08:00
akka cluster sharding , 根据需求分片就行了
|
10
lmshl 2022-04-26 14:25:58 +08:00
我写过一个
所有 fiber 去数据库查任务状态,select * from tasks where state = 'todo',然后执行这一批任务,更新任务状态。 最后并行 128 同时跑所有 fiber |
11
zmal 2022-04-26 14:57:11 +08:00
需求场景是两个问题:
1. 是否要把这部分逻辑从主系统解耦出来。 2. 怎样加快这部分业务的处理速度,减少资源占用,包括但不限于可以任意扩容的分布式、异步 IO 等等。 如果是我的话,个人对 Flink 比较熟悉,可能会选择解耦后用 Flink 来处理,Flink 解决了分布式、一致性容错等问题。 akka 解决的是异步 io 并发量问题,楼上 akka 的方案应该也是可行的。看你对哪个工具比较熟悉了。 |
12
git00ll 2022-04-26 15:26:13 +08:00
`但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络`
这句话不明白,啥接口要耗时 10 分钟? 等网络是什么意思。如果接口一次请求响应要 10 分钟,多开点线程如 200-300 个,网络堵塞的时候是不会大量占用 cpu 的。关键如果接口能否承受这么高并发数。 |
13
5boy 2022-04-26 16:12:24 +08:00
mark, 有没有不用大数据框架实现的方式?
|
14
litchinn 2022-04-26 16:24:53 +08:00
/t/848357 ,隔壁刚提出的这个动态线程池不知道能不能实现这个需求。另外你说线程池大小不好调,换成分布式多个机器跑,那节点数量不是一样需要调整吗,k8s 弹性伸缩?
|
15
misaka19000 2022-04-26 16:55:55 +08:00
用协程或者异步 IO
|
16
Saurichthys 2022-04-26 16:58:49 +08:00
不要用 xxl-job 的方案,基于数据库,性能不佳,莫名其妙问题很多
|
17
yesterdaysun OP @git00ll 说的不清楚, 其实是一个长流程, 比如请求一个报告, 但是不会立即返回, 需要等第三方处理好, 才能拿到, 中间就每隔 1-2 分钟去轮询一次看看报告有没有好, 通常都要 10 分钟左右, 关键不是每种任务都是这样的, 如果单为它建一个线程池又感觉有点过了, 想搞个通用的解法
上面的我都研究了一下, 我这个系统比较简单, 本身就是个单体, 并不是分布式的, 这次也只是想要把这个后台任务独立出去搞多机并行, 感觉我这个还不到动用 akka/协程之类的方案的地步, 应该还是简单点, 一个简单的调度系统加动态线程池就足够了, 美团开源的那个动态线程池看上去比较适合, 我先研究一下试试看 |
18
polarbear007 2022-04-26 17:56:55 +08:00
个人认为使用非阻塞 io 即可
|
19
jekkro 2022-04-26 18:17:50 +08:00
用 redis 实现异步队列即可,一个进程专门负责插入任务到 Redis 队列中,另外几个负责从队列中获取信息并执行,完成后更新数据库里的状态。如果发生 Redis 所在的机器 down 机,则负责插入任务的那个进程重新把没有完成的再插入一遍(不过这个目前为止还没有发生过)。我有类似的业务,已经跑了 12 年了。
另外因为 Redis 有各种复杂数据结构,可以满足延时队列,优先级队列,自动去重等功能。感觉性能优秀,代码简单。 |
20
jekkro 2022-04-26 18:21:16 +08:00
不能用非阻塞 io 的原因一般是因为那些接口库不是自己实现的,没办法去改造那些接口底层库,虽然 http 的接口自己也可以实现,但是有些场景(比如各种开放平台的接口库)不可能把第三方提供的接口库重新写一边,而仅仅是为了解决阻塞 io 的问题。
|
21
lmshl 2022-04-26 18:28:37 +08:00
@yesterdaysun 以上技术方案中,综合代码量和开发难度来看,从易到难依次应该是
纤程 >> Akka Stream > nio-pool > xxjob/scheduler > 动态线程池屎上雕花 >> akka cluster sharding >> akka cluster without sharding 纤程是真的简单,你这需求 20-50 行左右就完事了,不就是个 flow = post(...) >> (sleep(1.minutes) *> check(xxx)).retryWhile(isCompleted) >> retrieve() 然后 tasks.foreachPar(<你想开多大并行>)(flow) 的事 |
22
outoftimeerror 2022-04-26 18:33:29 +08:00
我也写 java ,不过你这个需求让我选型的话,我会用 golang (goroutine+chan)+ redis
|
23
XhstormR02 2022-04-26 19:24:21 +08:00 via Android
@lmshl java 的纤程 Quasar ,最近一次更新是 2018 年,都好多年没更新了 https://github.com/puniverse/quasar ,倒不如用 kotlin 的 coroutines
https://github.com/Kotlin/kotlinx.coroutines/ |
24
dddd1919 2022-04-26 21:27:40 +08:00
显然是该上 MQ 了,把用户放到队列,由消费端去挨个处理用户任务,如果单个用户跑的话配一个消费任务就够了
强业务需求建议 RabbitMQ/RocketMQ |
25
lmshl 2022-04-26 21:41:39 +08:00
@XhstormR02 反正我说的也不是 Java 🐶
其实上面写的是 Scala 伪代码 🐶 档燃,Kotlin 也不错,起码有 suspend/await 可以用,不像 IO Monad 要切换编程思维 |
26
mind3x 2022-04-27 02:40:39 +08:00 via Android
|
29
yhvictor 2022-04-27 23:36:07 +08:00 via iPhone
协程应该满好弄的。
nio 有点难写。 但我觉得吧,楼主这工作分两部分,第一部分是网络 io 等待准备好,第二部分是处理数据。 第一部分是 io 密集,第二部分是 cpu 密集。 所以如果拆成两部分,第一步就可以线程开满,直到数据准备好,放入一个 queue 。 第二步开线程约等于或小于 cpu 核心数,从 queue 中读准备好的数据源并处理。 齐活。 |
30
byte10 2022-04-28 09:35:05 +08:00
@polarbear007 是的,一个 NIO 就解决啦,花里胡哨的,想一些错误的方案。
首先 IO 密集型,线程开到 1000 个都不是问题,线程在 IO 的时候不占用 cpu 。当然可能同时响应时就会出现 cpu 拉满,所以 cpu 使用率就是锯齿形的,不好分析瓶颈。 你这个核心问题就是 IO 时间不确定,无法确定最大线程数。你可以看我的那个教程,https://www.bilibili.com/video/BV1FS4y1o7QB , NIO 如何无视 IO 时间,解决线程池大小的问题。你千万不要搞分布式,分布式是单机 cpu 性能出现瓶颈才干的事情,你这个场景一个树莓派 4B 就能完成。一定要切记,这些很基本很基本的问题,不要把事情想复杂了,在这一点犯错的人太多了。 至于多个异步转同步问题,countdownlatch 和 cyclicbarrier 都能很好解决。 找到最核心的问题,解决核心问题,加油。 |
31
wolfie 2022-04-28 15:37:50 +08:00
做过类似功能,自己实现了一个,参考了 xxl-job 的表结构。
# some_table - id - status ( running 、succeed 、failed ) # some_table_job (频繁扫描表) - some_table_id - is_running - next_run_time (索引字段) - last_run_time (索引字段)(几分钟一扫描,防止异常结束,长时间未完成) - version (版本号,乐观锁) # some_table_job_log - some_table_id - some_table_job_id - result ( succeed 、failed ) 1. 新增 some_table ,同时新增 some_table_job 2. 定时任务扫描 some_table_job ,拉取任务数据 3. 任务执行完 - 成功:写入 some_table_job_log ,删除 some_table_job ,回写 some_table 状态。 - 失败:写入 some_table_job_log ,计算 some_table_job 下一次执行时间。 |