以Python的Pyspider为例剖析搜索引擎的网络爬虫实现方法
|
在这篇文章中,我们将分析一个网络爬虫。 网络爬虫是一个扫描网络内容并记录其有用信息的工具。它能打开一大堆网页,分析每个页面的内容以便寻找所有感兴趣的数据,并将这些数据存储在一个数据库中,然后对其他网页进行同样的操作。 如果爬虫正在分析的网页中有一些链接,那么爬虫将会根据这些链接分析更多的页面。 搜索引擎就是基于这样的原理实现的。 这篇文章中,我特别选了一个稳定的、”年轻”的开源项目pyspider,它是由 binux 编码实现的。 注:据认为pyspider持续监控网络,它假定网页在一段时间后会发生变化,因此一段时间后它将会重新访问相同的网页。 概述 爬虫pyspider主要由四个组件组成。包括调度程序(scheduler),抓取程序(fetcher),内容处理程序(processor)以及一个监控组件。 调度程序接受任务并决定该做什么。这里有几种可能性,它可以丢弃一个任务(可能这个特定的网页刚刚被抓取过了),或者给任务分配不同的优先级。 当各个任务的优先级确定之后,它们被传入抓取程序。它重新抓取网页。这个过程很复杂,但逻辑上比较简单。 当网络上的资源被抓取下来,内容处理程序就负责抽取有用的信息。它运行一个用户编写的Python脚本,这个脚本并不像沙盒一样被隔离。它的职责还包括捕获异常或日志,并适当地管理它们。 最后,爬虫pyspider中有一个监控组件。 爬虫pyspider提供一个异常强大的网页界面(web ui),它允许你编辑和调试你的脚本,管理整个抓取过程,监控正在进行的任务,并最终输出结果。 项目和任务 在pyspider中,我们有项目和任务的概念。 一个任务指的是一个需要从网站检索并进行分析的单独页面。 一个项目指的是一个更大的实体,它包括爬虫涉及到的所有页面,分析网页所需要的python脚本,以及用于存储数据的数据库等等。 在pyspider中我们可以同时运行多个项目。 代码结构分析 根目录 在根目录中可以找到的文件夹有:
已经分析完项目的根目录了,仅根目录就能说明该项目是以一种非常专业的方式进行开发的。如果你正在开发任何的开源程序,希望你能达到这样的水准。 文件夹pyspider 让我们更深入一点儿,一起来分析实际的代码。 在这个文件夹中还能找到其他的文件夹,整个软件背后的逻辑已经被分割,以便更容易的进行管理和扩展。 这些文件夹是:database、fetcher、libs、processor、result、scheduler、webui。 在这个文件夹中我们也能找到整个项目的主入口点,run.py。 文件run.py 这个文件首先完成所有必需的杂事,以保证爬虫成功地运行。最终它产生所有必需的计算单元。向下滚动我们可以看到整个项目的入口点,cli()。 函数cli() 这个函数好像很复杂,但与我相随,你会发现它并没有你想象中复杂。函数cli()的主要目的是创建数据库和消息系统的所有连接。它主要解析命令行参数,并利用所有我们需要的东西创建一个大字典。最后,我们通过调用函数all()开始真正的工作。 函数all() 一个网络爬虫会进行大量的IO操作,因此一个好的想法是产生不同的线程或子进程来管理所有的这些工作。通过这种方式,你可以在等待网络获取你当前html页面的同时,提取前一个页面的有用信息。 函数all()决定是否运行子进程或者线程,然后调用不同的线程或子进程里的所有的必要函数。这时pyspider将产生包括webui在内的,爬虫的所有逻辑模块所需要的,足够数量的线程。当我们完成项目并关闭webui时,我们将干净漂亮地关闭每一个进程。 现在我们的爬虫就开始运行了,让我们进行更深入一点儿的探索。 调度程序 调度程序从两个不同的队列中获取任务(newtask_queue和status_queue),并把任务加入到另外一个队列(out_queue),这个队列稍后会被抓取程序读取。 调度程序做的第一件事情是从数据库中加载所需要完成的所有的任务。之后,它开始一个无限循环。在这个循环中会调用几个方法: 1._update_projects():尝试更新的各种设置,例如,我们想在爬虫工作的时候调整爬取速度。 2._check_task_done():分析已完成的任务并将其保存到数据库,它从status_queue中获取任务。 3._check_request():如果内容处理程序要求分析更多的页面,把这些页面放在队列newtask_queue中,该函数会从该队列中获得新的任务。 4._check_select():把新的网页加入到抓取程序的队列中。 5._check_delete():删除已被用户标记的任务和项目。 6._try_dump_cnt():记录一个文件中已完成任务的数量。对于防止程序异常所导致的数据丢失,这是有必要的。
def run(self):
while not self._quit:
try:
time.sleep(self.LOOP_INTERVAL)
self._update_projects()
self._check_task_done()
self._check_request()
while self._check_cronjob():
pass
self._check_select()
self._check_delete()
self._try_dump_cnt()
self._exceptions = 0
except KeyboardInterrupt:
break
except Exception as e:
logger.exception(e)
self._exceptions += 1
if self._exceptions > self.EXCEPTION_LIMIT:
break
continue
循环也会检查运行过程中的异常,或者我们是否要求python停止处理。 finally: # exit components run in subprocess for each in threads: if not each.is_alive(): continue if hasattr(each,'terminate'): each.terminate() each.join() 抓取程序 抓取程序的目的是检索网络资源。 pyspider能够处理普通HTML文本页面和基于AJAX的页面。只有抓取程序能意识到这种差异,了解这一点非常重要。我们将仅专注于普通的html文本抓取,然而大部分的想法可以很容易地移植到Ajax抓取器。 这里的想法在某种形式上类似于调度程序,我们有分别用于输入和输出的两个队列,以及一个大的循环。对于输入队列中的所有元素,抓取程序生成一个请求,并将结果放入输出队列中。 它听起来简单但有一个大问题。网络通常是极其缓慢的,如果因为等待一个网页而阻止了所有的计算,那么整个过程将会运行的极其缓慢。解决方法非常的简单,即不要在等待网络的时候阻塞所有的计算。这个想法即在网络上发送大量消息,并且相当一部分消息是同时发送的,然后异步等待响应的返回。一旦我们收回一个响应,我们将会调用另外的回调函数,回调函数将会以最适合的方式管理这样的响应。 爬虫pyspider中的所有的复杂的异步调度都是由另一个优秀的开源项目 http://www.tornadoweb.org/en/stable/ 完成。 现在我们的脑海里已经有了极好的想法了,让我们更深入地探索这是如何实现的。
def run(self):
def queue_loop():
if not self.outqueue or not self.inqueue:
return
while not self._quit:
try:
if self.outqueue.full():
break
task = self.inqueue.get_nowait()
task = utils.decode_unicode_obj(task)
self.fetch(task)
except queue.Empty:
break
tornado.ioloop.PeriodicCallback(queue_loop,100,io_loop=self.ioloop).start()
self._running = True
self.ioloop.start()
<strong>
函数run()</strong> 函数run()是抓取程序fetcher中的一个大的循环程序。 函数run()中定义了另外一个函数queue_loop(),该函数接收输入队列中的所有任务,并抓取它们。同时该函数也监听中断信号。函数queue_loop()作为参数传递给tornado的类PeriodicCallback,如你所猜,PeriodicCallback会每隔一段具体的时间调用一次queue_loop()函数。函数queue_loop()也会调用另一个能使我们更接近于实际检索Web资源操作的函数:fetch()。 函数fetch(self,task,callback=None) 网络上的资源必须使用函数phantomjs_fetch()或简单的http_fetch()函数检索,函数fetch()只决定检索该资源的正确方法是什么。接下来我们看一下函数http_fetch()。 函数http_fetch(self,url,callback) def http_fetch(self,callback): '''HTTP fetcher''' fetch = copy.deepcopy(self.default_options) fetch['url'] = url fetch['headers']['User-Agent'] = self.user_agent def handle_response(response): ... return task,result try: request = tornado.httpclient.HTTPRequest(header_callback=header_callback,**fetch) if self.async: self.http_client.fetch(request,handle_response) else: return handle_response(self.http_client.fetch(request)) 终于,这里才是完成真正工作的地方。这个函数的代码有点长,但有清晰的结构,容易阅读。 (编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
