协程开发下处理 tornado websocket 关闭事件

2021/02/10 技术

协程开发下处理 tornado websocket 关闭事件

验证代码如下:

import asyncio
import logging  # noqa
import os  # noqa
import typing  # noqa
import unittest
from threading import Thread

import arrow
import tornado.httpserver
import tornado.ioloop
import tornado.web
import websockets
from tornado.websocket import WebSocketHandler


class _Ws(WebSocketHandler):

    def check_origin(self, origin):
        """校验权限"""
        return True

    async def open(self):
        """开启连接"""
        print("on open")

    async def on_message(self, message):
        """连接通信"""
        print(f"on_message: {message}")


class WsHandler(_Ws):

    def on_close(self):
        """关闭连接"""
        print("on_closed")


def server(ws_handler_cls):
    asyncio.set_event_loop(asyncio.new_event_loop())

    app = tornado.web.Application(
        handlers=[
            (r'/ws', ws_handler_cls),
        ],
    )
    app.listen(8004)
    tornado.ioloop.IOLoop.instance().start()


class TestWs(unittest.TestCase):
    def setUp(self) -> None:
        pass

    def _req(self):
        """ """

        async def main():
            # 这些命令, 有问题
            version = arrow.now().to("local").format("YYYY-MM-DD HH:mm:ss")
            async with websockets.connect("ws://localhost:8004/ws") as ws:
                await ws.send(f"[{version}]start")
                await asyncio.sleep(2)
                await ws.send(f"[{version}]stop")

        for _ in range(5):
            asyncio.run(main())

    def testBasic(self):
        """ 非协程环境下, websocket 能接收到 关闭 事件 """
        Thread(target=server, args=(WsHandler,), daemon=True).start()

        self._req()

    def testAsyncioBad(self):
        """ 协程环境下, 使用 async def on_close 无法接收到 关闭 事件 """

        class Ws(_Ws):
            async def on_close(self):
                """关闭连接"""
                print("on_closed")
                await asyncio.sleep(1)
                print("print_close")

        Thread(target=server, args=(Ws,), daemon=True).start()

        self._req()

    def testAsyncioGood(self):
        """ 协程环境下, 使用 asyncio.ensure_future 能接收到 关闭 事件 """

        class Ws(_Ws):
            async def _close(self):
                """ """
                await asyncio.sleep(1)
                print("print_close")

            def on_close(self):
                """关闭连接"""
                print("on_closed")

                asyncio.ensure_future(self._close())

        Thread(target=server, args=(Ws,), daemon=True).start()

        self._req()

结论:

1) 协程环境下, 可以使用 async def get/on_message 等方法, 将接口方法变为 awaitable 方法. 但 on_close 不能使用这个样的方式处理

2) 协程方式执行on_close方法, 需要在方法内部, 使用 asyncio.ensure_future

Search

    Table of Contents