Python threading.Thread只能用私有方法self来停止.__ Thread_stop()
发布时间:2020-05-23 09:32:19 所属栏目:Python 来源:互联网
导读:我有一个函数接受大量的x,y对作为输入,使用numpy和scipy做一些精细的曲线拟合,然后返回一个值.为了尝试加快速度,我尝试使用Queue.Queue将数据提供给两个线程.数据完成后.我试图让线程终止,然后结束调用进程并将控制权返回给 shell. 我试图理解为什么我必须在t
|
我有一个函数接受大量的x,y对作为输入,使用numpy和scipy做一些精细的曲线拟合,然后返回一个值.为了尝试加快速度,我尝试使用Queue.Queue将数据提供给两个线程.数据完成后.我试图让线程终止,然后结束调用进程并将控制权返回给 shell. 我试图理解为什么我必须在threading中使用私有方法.线程停止我的线程并将控制返回到命令行. self.join()不会结束程序.获得控制权的唯一方法是使用私有停止方法. def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
# self.join(timeout=None) does not work
self._Thread__stop()
这是我的代码的近似值: class CalcThread(threading.Thread):
def __init__(self,in_queue,out_queue,function):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
self.function = function
self.finished = threading.Event()
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
self._Thread__stop()
def run(self):
while not self.finished.isSet():
params_for_function = self.in_queue.get()
try:
tm = self.function(paramsforfunction)
self.in_queue.task_done()
self.out_queue.put(tm)
except ValueError as v:
#modify params and reinsert into queue
window = params_for_function["window"]
params_for_function["window"] = window + 1
self.in_queue.put(params_for_function)
def big_calculation(well_id,window,data_arrays):
# do some analysis to calculate tm
return tm
if __name__ == "__main__":
NUM_THREADS = 2
workers = []
in_queue = Queue()
out_queue = Queue()
for i in range(NUM_THREADS):
w = CalcThread(in_queue,big_calculation)
w.start()
workers.append(w)
if options.analyze_all:
for i in well_ids:
in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))
in_queue.join()
print "ALL THREADS SEEM TO BE DONE"
# gather data and report it from out_queue
for i in well_ids:
p = out_queue.get()
print p
out_queue.task_done()
# I had to do this to get the out_queue to proceed
if out_queue.qsize() == 0:
out_queue.join()
break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method
for aworker in workers:
aworker.stop()
解决方法通常,杀死修改共享资源的线程是个坏主意.除非在执行计算时释放GIL,否则多线程中的CPU密集型任务比Python中的无用任务更糟糕.许多numpy函数确实发布了GIL. ThreadPoolExecutor example from the docs import concurrent.futures # on Python 2.x: pip install futures
calc_args = []
if options.analyze_all:
calc_args.extend(dict(well_id=i,...) for i in well_ids)
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
future_to_args = dict((executor.submit(big_calculation,args),args)
for args in calc_args)
while future_to_args:
for future in concurrent.futures.as_completed(dict(**future_to_args)):
args = future_to_args.pop(future)
if future.exception() is not None:
print('%r generated an exception: %s' % (args,future.exception()))
if isinstance(future.exception(),ValueError):
#modify params and resubmit
args["window"] += 1
future_to_args[executor.submit(big_calculation,args)] = args
else:
print('f%r returned %r' % (args,future.result()))
print("ALL work SEEMs TO BE DONE")
如果没有共享状态,您可以用ProcessPoolExecutor替换ThreadPoolExecutor.将代码放在main()函数中. (编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
