from multiprocessing import Pool
def doSomething(caseNumber):
# do xxx use caseNumber.
return Value
def SomeFunc(poolpool):
#do xxx
if xxx > 10:
return True
else:
return False
pass
if __name__ == '__main__':
#建池
Apool = Pool(10)
# 向 Apool 加异步
Apool.apply_async(doSomething, someArgs)
Apool.apply_async(Afunc, someArgs)
Apool.apply_async(Bfunc, someArgs)
Apool.apply_async(Cfunc, someArgs)
# Cret = Apool.apply_async(Cfunc, someArgs)
#省略...
#测试用,已注释,留着
# Bret = Apool.apply_async(Bfunc, someArgs)
# while True:
for i in SomeSuit:
#1、执行到此时,在这里能否判断出 Apool 进程池,此时有多少个进程(求数量)在跑?
#2、在这里能否做到判断异步的 Bfunc 执行完毕没?
#3、假设 2 的想法可以做到,并给出判断结果(True or False),
# 在这里能否能马上拿到异步 Cfun 执行完毕后的返回值?我理解的线程池,是必须 close 和 join 完成后,统一出结果?
pass
print("结束 for SomeSuit.")
Apool.close()
Apool.join()
1、执行到此时,在这里能否判断出 Apool 进程池,此时有多少个进程(求数量)在跑?
2、在这里能否做到判断异步的 Bfunc 执行完毕没?
3、假设 2 的想法可以做到,并给出判断结果(True or False),
在这里能否能马上拿到异步 Cfun 执行完毕后的返回值?我理解的线程池,是必须 close 和 join 完成后,统一出结果?
1
ClericPy 2019-10-24 00:41:25 +08:00
def apply_async(self, func, args=(), kwds={}, callback=None,
error_callback=None): ''' Asynchronous version of `apply()` method. ''' if self._state != RUN: raise ValueError("Pool not running") result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) return result 1 2 3 基本都有办法的, 在一切皆对象的 Python 里, 几乎所有玩意都能自省 看看源码去吧, 一点点说太麻烦了 比如 apply_async 方法的返回值就是 ApplyResult 对象, ApplyResult 对象里可以判断是否完成以及立刻取得结果 Apool 的 self._pool = [] 这里也可以看有多少 多看源码吧 友情提醒, 你这个用法已经过时了, 现在多进程多线程的池都建议使用 concurrent.futures 里面那俩, 借助很多语言都在流行的 Future 概念, 可以在同步代码里面把异步操作简化. 尤其是借助 callback 方式(虽然你上面的代码也可以用回调)也算不难理解 |
2
ClericPy 2019-10-24 00:43:02 +08:00
然后还有 close 和 join 忘了说
前者的意思是进程池已经关闭, 如果再添加新任务, 会直接抛错, 而不是真正关闭了所有进程 后者意思是, 主线程 /主进程 整个阻塞住, 直到进程池里的任务全都完成 你想直接拿那个结果, 别 join, 直接对那个提交后得到的对象使用 get 方法 |
3
qazwsxkevin OP @ClericPy,有不明白的地方,concurrent.futures,比如:
``` eStatusSuit = [] e = futures.ProcessPoolExecutor(max_workers=5) eStatus = e.submit(ProcessCaseID,someVarA ,someVarB) eStatusSuit.append(eStatus) # eStatus = e.submit(ProcessCaseID,someVarC ,someVarD) eStatusSuit.append(eStatus) # eStatus = e.submit(ProcessCaseID,someVarE ,someVarF) eStatusSuit.append(eStatus) #此时是向 e 提交了 3 个任务 #eStatus 对象,我看了一下,似乎是无法查看到 33 个任务具体状态,只能等待 eStatus 全体执行完毕,全部返回 eStatus.result()? #eStatus.result()是个阻塞式,想不到怎么用。。。 #我是想建立能跑 5 个进程的可控队列,不知道这么干是否合适,还是有更方便的方式? aExecutor = futures.ProcessPoolExecutor(max_workers=1) bExecutor = futures.ProcessPoolExecutor(max_workers=1) cExecutor = futures.ProcessPoolExecutor(max_workers=1) dExecutor = futures.ProcessPoolExecutor(max_workers=1) eExecutor = futures.ProcessPoolExecutor(max_workers=1) 然后做个 aExecutorStatus = aExecutor.submit(ProcessCaseID,someVarA ,someVarB) bExecutorStatus = bExecutor.submit(ProcessCaseID,someVarC ,someVarD) #省略... #对各个 ExecutorStatus 的 running(),done()进行循环判断,哪个 False/True 了,就从 queue 里取任务提交过去,哪个失败了,再调度一下优先权 if aExecutorStatus.running(): xxx #省略... 不知道是不是这样乱来的? ``` |
4
ClericPy 2019-10-26 18:26:31 +08:00
ProcessPoolExecutor 可以看做一个进程池执行器, 朝里面提交函数和参数以后, 会返回一个 Future, 这时候任务就开始执行了, 所以常见的用法就是:
1. 新建一个进程池执行器, 设置好并发数 pool = ProcessPoolExecutor(5) 2. futures = [pool.submit(func, var[0], var[1]) for var in var_list] 这时候任务都在后台派出的线程执行中 3. 然后就该等待任务完成了, 如果想要按执行结束的顺序来处理, 就 from concurrent.futures import as_completed for future in as_completed(futures): result = future.result(timeout=None) 如果无所谓完成顺序, 但是在意任务匹配顺序, 就 for future in futures: result = future.result(timeout=None) 这里 timeout 可以配置成一个 float, 然后 try catch 住 timeouterror, 不过不确定多进程会不会杀死超时任务, 因为平时我大都用线程, 线程是肯定杀不死的... 如上, 并发的好处就体现出来了, 也就是说, 在没达到并发限制的情况下, 整个任务理论上完成耗时不会超过最慢任务的耗时, 虽然实际上会受并发限制和 CPU 数量影响 @qazwsxkevin |