处理函数写得不太稳健,会吊死而导致
FutureRetList[i].running()一直认为是 True
粗暴做了个超时认定
看了 concurrent.futures 的一些方法,看不出有什么方法能在外部把进程池里某个进程推倒?
#是否进行多线程处理
MultiProcFlag == True
#定义进程数量
ProcessAmount = 12
if MultiProcFlag == True:
FutureList = []
FutureRetList = []
FutureStartTimeList = []
FutureProcSuit = []
for i in range(ProcessAmount):
FutureList.append(futures.ProcessPoolExecutor(max_workers=1))
for i in range(ProcessAmount):
FutureRetList.append(futures.Future())
for i in range(ProcessAmount):
FutureProcSuit.append('0')
for i in range(ProcessAmount):
FutureStartTimeList.append(time.time())
pass
# 多进程处理
if MultiProcFlag == True:
insertPoolFlag = False # 在 while 循环中判断是否成功加入进程池
while insertPoolFlag == False:
for i in range(ProcessAmount):
Process_i = str(i + 1)
if FutureRetList[i].running() == True:
#进行时间计算,如果某进程超时,关闭进程,重新提交任务
tmpTime = (time.clock() - FutureStartTimeList[i])
# 如果进程持续时间超过 15 分钟,认定任务失败,需要重新进行
if tmpTime > 900:
FutureList[i].shutdown() # <-恐怕不是这样乱来的?
#FutureList[i].xxxx? # <-- how?
time.sleep(5)
FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,FutureProcSuit[i],countt,ErrorLogFilePath)
print("进程池:[" + Process_i + "] [重新提交]了: [" + FutureProcIDs[i] + "]",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}")
else:
print("进程池:[" + Process_i + "] 已经运行了 [" + '[%.2f] 秒' % (time.clock() - FutureStartTimeList[i]))
time.sleep(1)
pass
else: # 见到有空闲的进程就提交任务
FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath)
FutureStartTimeList[i] = time.time()
FutureProcSuit[i] = Task[j] # 记下这个任务,准备在失败的时候,再调出进行重新提交,反正是死磕到任务成功为止
print("进程池:[" + Process_i + "] 提交了: [" + countt + "] 是第 [" + str(countt) + "] 个任务.",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}")
insertPoolFlag = True
break
pass
time.sleep(2)
else: # 单进程处理
ProcessRet = SProcessFunc("1",TaskDict,Task[j],countt,ErrorLogFilePath)
1
ManjusakaL 2019-11-25 01:11:57 +08:00 via Android
直接记录 PID,然后 kill 不就完了。。。
|
2
qazwsxkevin OP @ManjusakaL 可以对线程启动的内容纪录 PID ? 假如各个进程跑的都是是 Chrome.exe ,12 个进程跑满,内存会有 N 个 python.exe ,N 个 Chrome.exe ,怎么分辨谁是谁? 能零失误杀进程?
以前只试过用 psutil.pids()来获取 PID 杀进程。。。 |
3
ManjusakaL 2019-11-25 10:45:44 +08:00
@qazwsxkevin 如果怕误杀进程的话,可以在提交任务的时候,传入一个 unique 的参数,然后开个队列,将 unique 和 pid 回传,然后在 main process 做一个 map,直接对想杀的组合发送 SIGKILL 就行了
|
4
qazwsxkevin OP |
5
Latin 2019-11-25 11:28:42 +08:00
executor = ProcessPoolExecutor(1)
executor.submit(xxx,xxx) pid = list(executor._processes.keys())[0] 就是记录 然后调用 shell 命令 Kill 找了好久,没有优雅的解决方案 |
6
ManjusakaL 2019-11-25 11:35:07 +08:00
@qazwsxkevin 就是一个唯一的标识,可以理解是任务 ID,这样你可以将具体的任务和 PID 绑定
|
7
ClericPy 2019-11-25 14:35:34 +08:00
|
8
qazwsxkevin OP 做了一个简单测试,情况不妙,估计不好。。。
else: # 见到有空闲的进程就提交任务 FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath) FutureStartTimeList[i] = time.time() FutureProcSuit[i] = Task[j] # 记下这个任务,准备在失败的时候,再调出进行重新提交,反正是死磕到任务成功为止 print("进程池:[" + Process_i + "] 提交了: [" + countt + "] 是第 [" + str(countt) + "] 个任务.",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}") pid = (list(FutureList[i]._processes.keys()))[0] print(pid) time.sleep(20) #进行 20 秒左右后杀进程 exeCstr = "taskkill -f -pid " + str(pid) os.system(exeCstr) time.sleep(10) #再次提交 FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath) 在杀进程后,直接就抛出异常了,再次提交也是不行的,直接报 1 码结束了主程序,整体结束。 outut: Traceback (most recent call last): File "D:/Work//SPFromDB.py", line 309, in <module> FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath) File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\concurrent\futures\process.py", line 452, in submit raise BrokenProcessPool('A child process terminated ' concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore 进程已结束,退出代码 1 |