|
我正在一个服务器中使用cherrypy来实现类似REST的API. 这些反应意味着一些繁重的计算需要大约2秒钟 请求.为了进行这种计算,使用了一些更新的数据 一天一次.
数据在后台更新(大约需要半小时), 一旦更新,新数据的引用就会传递给 响应请求的函数.这只需要一个毫秒.
我需要的是确保每个请求都以 旧数据或新数据,但在更改数据引用时不会发生任何请求处理.理想情况下,我希望找到一种在更改数据引用时缓冲传入请求的方法,并确保在所有进程内请求完成后更改引用.
我当前(非)工作的最小例子如下:
import time
import cherrypy
from cherrypy.process import plugins
theData = 0
def processData():
"""Backround task works for half hour three times a day,and when finishes it publish it in the engine buffer."""
global theData # using global variables to simplify the example
theData += 1
cherrypy.engine.publish("doChangeData",theData)
class DataPublisher(object):
def __init__(self):
self.data = 'initData'
cherrypy.engine.subscribe('doChangeData',self.changeData)
def changeData(self,newData):
cherrypy.engine.log("Changing data,buffering should start!")
self.data = newData
time.sleep(1) #exageration of the 1 milisec of the references update to visualize the problem
cherrypy.engine.log("Continue serving buffered and new requests.")
@cherrypy.expose
def index(self):
result = "I get "+str(self.data)
cherrypy.engine.log(result)
time.sleep(3)
return result
if __name__ == '__main__':
conf = {
'/': { 'server.socket_host': '127.0.0.1','server.socket_port': 8080}
}
cherrypy.config.update(conf)
btask = plugins.BackgroundTask(5,processData) #5 secs for the example
btask.start()
cherrypy.quickstart(DataPublisher())
如果我运行此脚本,并且还打开浏览器,请将localhost:8080放入并刷新 页面很多,我得到:
...
[17/Sep/2015:21:32:41] ENGINE Changing data,buffering should start!
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "...
[17/Sep/2015:21:32:42] ENGINE I get 3
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...
这意味着一些请求处理在之前和之后开始 数据引用开始或结束更改.我想避免这两种情况. 就像是:
...
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "...
[17/Sep/2015:21:32:41] ENGINE Changing data,buffering should start!
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
[17/Sep/2015:21:32:42] ENGINE I get 3
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...
我搜索了文档和网页,发现这些引用并不完全涵盖这种情况:
http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html
How to execute asynchronous post-processing in CherryPy?
http://tools.cherrypy.org/wiki/BackgroundTaskQueue
Cherrypy : which solutions for pages with large processing time
How to stop request processing in Cherrypy?
更新(使用简单的解决方案):
经过深思熟虑之后,我认为这个问题具有误导性,因为它包含了问题本身的一些实现要求,即:停止处理并开始缓冲.对于该问题,可以将需求简化为:确保使用旧数据或新数据处理每个请求.
对于后者,足以存储所使用数据的时间局部参考.此引用可用于所有请求处理,如果另一个线程更改self.data,则没有问题.对于python对象,垃圾收集器将处理旧数据.
具体来说,通过以下方式更改索引函数就足够了:
@cherrypy.expose
def index(self):
tempData = self.data
result = "I started with %s"%str(tempData)
time.sleep(3) # Heavy use of tempData
result += " that changed to %s"%str(self.data)
result += " but I am still using %s"%str(tempData)
cherrypy.engine.log(result)
return result
结果我们会看到:
[21 / Sep / 2015:10:06:00]引擎我从1开始改为2,但我仍然使用1
我仍然希望保留原始(更具限制性)的问题和cyraxjoe的答案,因为我发现这些解决方案非常有用.
最佳答案
我将解释两种解决问题的方法.
第一个是基于插件的.
基于插件还需要一种同步.它只能起作用,因为只有一个BackgroundTask进行修改(也只是一个原子操作).
import time
import threading
import cherrypy
from cherrypy.process import plugins
UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20
next_data = 1
class DataGateway(plugins.SimplePlugin):
def __init__(self,bus):
super(DataGateway,self).__init__(bus)
self.data = next_data
def start(self):
self.bus.log("Starting DataGateway")
self.bus.subscribe('dg:get',self._get_data)
self.bus.subscribe('dg:update',self._update_data)
self.bus.log("DataGateway has been started")
def stop(self):
self.bus.log("Stopping DataGateway")
self.bus.unsubscribe('dg:get',self._get_data)
self.bus.unsubscribe('dg:update',self._update_data)
self.bus.log("DataGateway has been stopped")
def _update_data(self,new_val):
self.bus.log("Changing data,buffering should start!")
self.data = new_val
time.sleep(UPDATE_DELAY)
self.bus.log("Continue serving buffered and new requests.")
def _get_data(self):
return self.data
def processData():
"""Backround task works for half hour three times a day,and when finishes it publish it in the engine buffer."""
global next_data
cherrypy.engine.publish("dg:update",next_data)
next_data += 1
class DataPublisher(object):
@property
def data(self):
return cherrypy.engine.publish('dg:get').pop()
@cherrypy.expose
def index(self):
result = "I get " + str(self.data)
cherrypy.engine.log(result)
time.sleep(REQUEST_DELAY)
return result
if __name__ == '__main__':
conf = {
'global': {
'server.thread_pool': THREAD_POOL_SIZE,'server.socket_host': '127.0.0.1','server.socket_port': 8080,}
}
cherrypy.config.update(conf)
DataGateway(cherrypy.engine).subscribe()
plugins.BackgroundTask(UPDATE_DELAY,processData).start()
cherrypy.quickstart(DataPublisher())
在这个版本中,同步来自于read& amp; write操作在cherrypy.engine线程上执行.所有内容都是在您刚刚操作发布到引擎中的插件DataGateway上抽象出来的.
第二种方法是使用Event a threading.Event对象.这是一种更加手动的方法,其额外的好处是,由于读取速度更快,因此它可能会更快,因为它不会在cherrypy.engine线程上执行.
threading.Event based(a.k.a.manual)
import time
import cherrypy
import threading
from cherrypy.process import plugins
UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20
next_data = 1
def processData():
"""Backround task works for half hour three times a day,and when finishes it publish it in the engine buffer."""
global next_data
cherrypy.engine.publish("doChangeData",next_data)
next_data += 1
class DataPublisher(object):
def __init__(self):
self._data = next_data
self._data_readable = threading.Event()
cherrypy.engine.subscribe('doChangeData',self.changeData)
@property
def data(self):
if self._data_readable.is_set():
return self._data
else:
self._data_readable.wait()
return self.data
@data.setter
def data(self,value):
self._data_readable.clear()
time.sleep(UPDATE_DELAY)
self._data = value
self._data_readable.set()
def changeData(self,buffering should start!")
self.data = newData
cherrypy.engine.log("Continue serving buffered and new requests.")
@cherrypy.expose
def index(self):
result = "I get " + str(self.data)
cherrypy.engine.log(result)
time.sleep(REQUEST_DELAY)
return result
if __name__ == '__main__':
conf = {
'global': {
'server.thread_pool': THREAD_POOL_SIZE,}
}
cherrypy.config.update(conf)
plugins.BackgroundTask(UPDATE_INTERVAL,processData).start()
cherrypy.quickstart(DataPublisher())
我已经在@property装饰器中添加了一些细节,但真正的要点是在threading.Event以及DataPublisher对象在工作线程之间共享的事实.
我还添加了两个示例中增加线程池大小所需的线程池配置.默认值为10.
作为一种测试我刚刚说过的方法,你可以执行这个Python 3脚本(如果你现在没有python3,你有一个安装它的借口)它会在给定线程池的情况下或多或少地同时执行100个请求.
测试脚本
(编辑:安卓应用网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|