异步应用

异步设计模式与 WSGI 的同步特性不太兼容。这就是为什么大多数异步框架(tornado, twisted, ...)实现了一个专门的 API 来暴露它们的异步功能。Bottle 是一个 WSGI 框架,共享 WSGI 的同步特性,但由于出色的 gevent 项目,仍然可以使用 bottle 编写异步应用。本文档介绍了 Bottle 与异步 WSGI 的用法。

同步 WSGI 的局限性

简而言之,WSGI 规范 (pep 3333) 将请求/响应周期定义如下:针对每个请求调用一次应用程序可调用对象,并且必须返回一个响应体迭代器。然后服务器迭代响应体并将每个块写入套接字。一旦响应体迭代器被耗尽,客户端连接就会关闭。

这很简单,但有一个问题:所有这一切都是同步发生的。如果你的应用程序需要等待数据(IO、套接字、数据库等),它必须要么 yield 空字符串(忙等),要么阻塞当前线程。这两种解决方案都会占用处理线程,并阻止它响应新的请求。因此,每个线程只能处理一个正在进行的请求。

大多数服务器限制线程数量以避免其相对较高的开销。线程池通常为 20 个或更少。一旦所有线程都被占用,任何新的连接都会被阻塞。服务器实际上对其他人来说已经“死亡”了。如果你想实现一个使用长轮询 ajax 请求来获取实时更新的聊天应用,你在 20 个并发连接时就会达到限制。这是一个相当小的聊天应用。

Greenlets 来救援

大多数服务器由于切换和创建新线程涉及的高开销,将其工作线程池的大小限制在相对较低的并发线程数量。虽然线程比进程(forks)便宜,但为每个新连接创建它们仍然很昂贵。

gevent 模块引入了 greenlets。Greenlets 的行为类似于传统线程,但创建成本非常低廉。基于 gevent 的服务器可以以几乎零开销生成数千个 greenlets(每个连接一个)。阻塞单个 greenlet 对服务器接受新请求的能力没有影响。并发连接的数量几乎是无限的。

这使得创建异步应用变得异常简单,因为它们看起来和感觉起来就像同步应用。基于 gevent 的服务器实际上并不是异步的,而是海量多线程的。这里有一个例子

from gevent import monkey; monkey.patch_all()

from time import sleep
from bottle import route, run

@route('/stream')
def stream():
    yield 'START'
    sleep(3)
    yield 'MIDDLE'
    sleep(5)
    yield 'END'

run(host='0.0.0.0', port=8080, server='gevent')

第一行很重要。它导致 gevent 对大多数 Python 的阻塞 API 进行猴子补丁 (monkey-patch),使其不再阻塞当前线程,而是将 CPU 传递给下一个 greenlet。它实际上用基于 gevent 的伪线程替换了 Python 的 threading。这就是为什么你仍然可以使用通常会阻塞整个线程的 time.sleep()。如果你对猴子补丁 Python 内建函数感到不舒服,可以使用相应的 gevent 函数(本例中是 gevent.sleep())。

如果你运行这段脚本并让浏览器访问 http://localhost:8080/stream,你应该会看到 STARTMIDDLEEND 逐个出现(而不是等待 8 秒钟一次看到它们全部)。它的工作方式与普通线程完全相同,但现在你的服务器可以毫无问题地处理数千个并发请求。

注意

某些浏览器在开始渲染页面之前会缓冲一定量的数据。你可能需要 yield 超过几个字节才能在这些浏览器中看到效果。此外,许多浏览器对每个 URL 的并发连接数有限制。如果是这种情况,你可以使用第二个浏览器或基准测试工具(例如 abhttperf)来衡量性能。

事件回调

在异步框架(包括 tornado, twisted, node.js 等)中一个非常常见的设计模式是使用非阻塞 API 并将回调函数绑定到异步事件。套接字对象会一直保持打开状态直到被显式关闭,以便回调函数稍后可以向套接字写入数据。这里有一个基于 tornado 库的例子

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        worker = SomeAsyncWorker()
        worker.on_data(lambda chunk: self.write(chunk))
        worker.on_finish(lambda: self.finish())

主要优点是请求处理程序提前终止。处理线程可以继续并接受新的请求,而回调函数则继续向先前请求的套接字写入数据。这就是这些框架如何仅使用少量操作系统线程就能处理大量并发请求的方式。

使用 Gevent+WSGI,情况有所不同:首先,提前终止没有任何好处,因为我们有无限多的(伪)线程来接受新的连接。其次,我们不能提前终止,因为那样会关闭套接字(WSGI 规范的要求)。第三,我们必须返回一个可迭代对象以符合 WSGI。

为了符合 WSGI 标准,我们所要做的就是返回一个我们可以异步写入的响应体可迭代对象。借助 gevent.queue,我们可以 模拟 一个分离的套接字,并将前面的例子改写如下

@route('/fetch')
def fetch():
    body = gevent.queue.Queue()
    worker = SomeAsyncWorker()
    worker.on_data(body.put)
    worker.on_finish(lambda: body.put(StopIteration))
    worker.start()
    return body

从服务器的角度来看,队列对象是可迭代的。如果为空则阻塞,一旦遇到 StopIteration 则停止。这符合 WSGI 规范。在应用程序端,队列对象的行为类似于非阻塞套接字。你可以在任何时候向它写入数据,传递它,甚至可以启动一个新的(伪)线程来异步向其写入数据。这通常是实现长轮询的方式。

最后:WebSockets

让我们暂时忘掉底层细节,来谈谈 WebSockets。既然你正在阅读这篇文章,你可能知道 WebSockets 是什么:它是浏览器(客户端)和 Web 应用程序(服务器)之间的双向通信通道。

幸运的是,gevent-websocket 包为我们完成了所有繁重的工作。这里是一个简单的 WebSocket 端点,它接收消息并将其发送回客户端

from bottle import request, Bottle, abort
app = Bottle()

@app.route('/websocket')
def handle_websocket():
    wsock = request.environ.get('wsgi.websocket')
    if not wsock:
        abort(400, 'Expected WebSocket request.')

    while True:
        try:
            message = wsock.receive()
            wsock.send("Your message was: %r" % message)
        except WebSocketError:
            break

from gevent.pywsgi import WSGIServer
from geventwebsocket import WebSocketError
from geventwebsocket.handler import WebSocketHandler
server = WSGIServer(("0.0.0.0", 8080), app,
                    handler_class=WebSocketHandler)
server.serve_forever()

while 循环会一直运行直到客户端关闭连接。你懂的 :)

客户端的 JavaScript API 也非常简单直观

<!DOCTYPE html>
<html>
<head>
  <script type="text/javascript">
    var ws = new WebSocket("ws://example.com:8080/websocket");
    ws.onopen = function() {
        ws.send("Hello, world");
    };
    ws.onmessage = function (evt) {
        alert(evt.data);
    };
  </script>
</head>
</html>