Websocket Server Transport
rsocket-py
supports multiple transport protocols. The Websocket transport is available by installing rsocket using one
of the extra features:
- pip3 install rsocket[aiohttp]
- pip3 install rsocket[quart]
In order to use a Websocket transport, instantiate a TransportAioHttpWebsocket
or TransportQuartWebsocket
and pass it to an RSocketServer
instance. There are a few helpers to ease the creation of a server or a client.
Examples
Example using aiohttp
:
import logging
import sys
from asyncio import Future
from aiohttp import web
from rsocket.helpers import create_future
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.transports.aiohttp_websocket import websocket_handler_factory
class Handler(BaseRequestHandler):
async def request_response(self, payload: Payload) -> Future:
return create_future(Payload(b'pong'))
if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 6565
logging.basicConfig(level=logging.DEBUG)
app = web.Application()
app.add_routes([web.get('/', websocket_handler_factory(handler_factory=Handler))])
web.run_app(app, port=port)
Example using quart
:
import logging
import sys
from asyncio import Future
from quart import Quart
from rsocket.helpers import create_future
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.transports.quart_websocket import websocket_handler
app = Quart(__name__)
class Handler(BaseRequestHandler):
async def request_response(self, payload: Payload) -> Future:
return create_future(Payload(b'pong'))
@app.websocket("/")
async def ws():
await websocket_handler(handler_factory=Handler)
if __name__ == "__main__":
port = sys.argv[1] if len(sys.argv) > 1 else 6565
logging.basicConfig(level=logging.DEBUG)
app.run(port=port)