RxPy Integration
The rsocket-py
implementation doesn't use RxPy by default. A wrapper class RxRSocketClient
can be used to interact with RxPy (>= 3.2.0) entities (Observable
, Observer
)
Getting started
To use Rx with the rsocket client instantiate an RxRSocket with an existing client or server instance:
from rsocket.rx_support.rx_rsocket import RxRSocket
import asyncio
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
from rsocket.helpers import single_transport_provider
async def main():
connection = await asyncio.open_connection('localhost', 6565)
async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
rx_client = RxRSocket(client)
... # Execute requests
if __name__ == '__main__':
asyncio.run(main())
Examples
RxRSocket can be used as a context manager with a client which is not yet connected. It will close the underlying client when exiting the context. Example code:
from rsocket.rx_support.rx_rsocket import RxRSocket
import asyncio
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
from rsocket.helpers import single_transport_provider
async def main():
connection = await asyncio.open_connection('localhost', 6565)
client = RSocketClient(single_transport_provider(TransportTCP(*connection)))
async with RxRSocket(client) as rx_client:
... # Execute requests
if __name__ == '__main__':
asyncio.run(main())
Request Response
from rsocket.payload import Payload
from rx import operators
received_message = await rx_client.request_response(
Payload(b'request text')
).pipe(
operators.map(lambda payload: payload.data),
operators.single()
)
Request Stream
from rsocket.payload import Payload
from rx import operators
received_messages = await rx_client.request_stream(
Payload(b'request text'),
request_limit=2
).pipe(
operators.map(lambda payload: payload.data),
operators.to_list()
)
Request Channel
from rsocket.payload import Payload
import rx
from rx import operators
sent_payloads = [Payload(data) for data in [b'1', b'2', b'3']]
received_messages = await rx_client.request_channel(
Payload(b'request text'),
observable=rx.from_list(sent_payloads),
request_limit=2
).pipe(
operators.map(lambda payload: payload.data),
operators.to_list()
)