Statistics
As a last step, we will add passing some statistics between the client and the server:
- The client will be able to send its memory usage to the server.
- The server will report the number of users and channels. The client will be able to specify which of these statistics it wants.
See resulting code on GitHub
Shared code
We will define some data-classes to represent the payloads being sent between the client and server:
from dataclasses import dataclass, field
from typing import Optional, List
@dataclass(frozen=True)
class ServerStatistics:
user_count: Optional[int] = None
channel_count: Optional[int] = None
@dataclass()
class ServerStatisticsRequest:
ids: Optional[List[str]] = field(default_factory=lambda: ['users', 'channels'])
period_seconds: Optional[int] = field(default_factory=lambda: 2)
@dataclass(frozen=True)
class ClientStatistics:
memory_usage: Optional[int] = None
Lines 4-7 define the data sent to the client upon request. It contains two optional fields, the user count and the channel count.
Lines 9-12 define a request from the client which specified which statistics it wants and how often to report. The ids
list
represents the two values in the ServerStatistics
class.
Lines 14-16 define the statistics sent from the client to the server.
Server side
We will add two endpoints, one for receiving from the client, and one for requesting specific statistics from the server.
Data storage
First we will add a field on the UserSessionData
to store the last statistics sent by the client:
from dataclasses import dataclass
from typing import Optional
from shared import ClientStatistics
@dataclass()
class UserSessionData:
...
statistics: Optional[ClientStatistics] = None
We will add a helper method for creating a new statistics response:
def new_statistics_data(statistics_request: ServerStatisticsRequest):
statistics_data = {}
if 'users' in statistics_request.ids:
statistics_data['user_count'] = len(chat_data.user_session_by_id)
if 'channels' in statistics_request.ids:
statistics_data['channel_count'] = len(chat_data.channel_messages)
return ServerStatistics(**statistics_data)
The method will create a new ServerStatistics
(Line 10) based on the ServerStatisticsRequest
request (Line 1). The client can request statistics about:
- User count (Lines 4-5)
- Channel count (Lines 7-8)
Receive statistics
import json
from shared import ClientStatistics
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter
class ChatUserSession:
def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
...
@router.fire_and_forget('statistics')
async def receive_statistics(statistics: ClientStatistics):
self._session.statistics = statistics
Lines 14-16 defines an endpoint for receiving statistics from the client. It uses the fire-and-forget request type, since this data is not critical to the application. No return value is required from this method, and if provided will be ignored.
Send statistics
Next we define the endpoint for sending statistics to the client:
import asyncio
import json
from shared import ClientStatistics, ServerStatisticsRequest, ServerStatistics, encode_dataclass
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import Subscriber, DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter
class ChatUserSession:
def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
...
@router.channel('statistics')
async def send_statistics(request: ServerStatisticsRequest):
class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription):
def __init__(self, session: UserSessionData, requested_statistics: ServerStatisticsRequest):
super().__init__()
self._session = session
self._requested_statistics = requested_statistics
def cancel(self):
self._sender.cancel()
def subscribe(self, subscriber: Subscriber):
super().subscribe(subscriber)
subscriber.on_subscribe(self)
self._sender = asyncio.create_task(self._statistics_sender())
async def _statistics_sender(self):
while True:
try:
await asyncio.sleep(self._requested_statistics.period_seconds)
next_message = new_statistics_data(self._requested_statistics)
self._subscriber.on_next(dataclass_to_payload(next_message))
except Exception:
logging.error('Statistics', exc_info=True)
def on_next(self, value: Payload, is_complete=False):
request = ServerStatisticsRequest(**json.loads(utf8_decode(value.data)))
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
if request.ids is not None:
self._requested_statistics.ids = request.ids
if request.period_seconds is not None:
self._requested_statistics.period_seconds = request.period_seconds
response = StatisticsChannel(self._session, request)
return response, response
Lines 18-59 defines an endpoint for sending statistics to the client. It uses the request-channel request type, which will allow the client to both receive the server statistics, and update the server as to which statistics it wants to receive.
The handler defined in this endpoint implements both Subscriber
and Subscription
. This is because it both sends and received.
This handler is similar to the one implemented to deliver messages to users.
In this handler, the _statistics_sender
(Lines 36-44) asyncio task is started, and periodically sends server statistics to the client.
To this handler we will add inheriting from DefaultSubscriber
. In this interface we implement the on_next
method (Lines 46-55),
which will consume requests from the client to change the types of statistics the server sends.
Client side
On the client side we will add the methods to access the new server side functionality:
send_statistics
listen_for_statistics
Send statistics
import resource
from shared import ServerStatistics, ClientStatistics
from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload
class ChatClient:
async def send_statistics(self):
memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
payload = Payload(encode_dataclass(ClientStatistics(memory_usage=memory_usage)),
metadata=composite(route('statistics')))
await self._rsocket.fire_and_forget(payload)
The send_statistics
uses a fire-and-forget request (Line 15) to send statistics to the server. This request does not receive a response,
so does not wait for confirmation that the payload was delivered, as it is not critical information (at least for this tutorial).
Receive statistics
Next we will request statistics from the server. First we will define a handler to listen on the channel request and control it:
import json
from asyncio import Event
from datetime import timedelta
from typing import List
from examples.tutorial.step6.models import ServerStatistics, ServerStatisticsRequest, dataclass_to_payload
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
class StatisticsHandler(DefaultPublisher, DefaultSubscriber, DefaultSubscription):
def __init__(self):
super().__init__()
self.done = Event()
def on_next(self, value: Payload, is_complete=False):
statistics = ServerStatistics(**json.loads(utf8_decode(value.data)))
print(statistics)
if is_complete:
self.done.set()
def cancel(self):
self.subscription.cancel()
def set_requested_statistics(self, ids: List[str]):
self._subscriber.on_next(dataclass_to_payload(ServerStatisticsRequest(ids=ids)))
def set_period(self, period: timedelta):
self._subscriber.on_next(
dataclass_to_payload(ServerStatisticsRequest(period_seconds=int(period.total_seconds()))))
This handler is similar to the server side, in that it implements Publisher
, Subscriber
and Subscription
.
Instead of periodically sending messages, as the server did, two methods are added to explicitly send messages:
set_requested_statistics
(Lines ) is used to change the type of statistics sentset_period
(Lines ) is used to change the frequency in which statistics are sent
on_next
(Lines ) is implemented to print the statistics received form the server.
cancel
(Lines ) can be used to stop receiving the statistics.
Next we will use this new handler in the ChatClient
:
from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload
class ChatClient:
def listen_for_statistics(self, request: ServerStatisticsRequest) -> StatisticsHandler:
self._statistics_subscriber = StatisticsHandler()
response = self._rsocket.request_channel(Payload(
data=encode_dataclass(request),
metadata=composite(route('statistics')
)), publisher=self._statistics_subscriber)
response.subscribe(self._statistics_subscriber)
return self._statistics_subscriber
In Line 7 we instantiate a StatisticsHandler
. We save this instance to later communicate over this channel.
Lines 9-12 send the request, and set the publisher
as the instance of our handler. This ensures that requests on the handler are sent to the server.
Line 14 then connects the Publisher
in the response to the handler, in order to use it to print the received statistics, using the Subscriber
functionality
of the handler.
Finally on Line 16 we return the handler to allow the application flow to control and interact with the channel.
Test the new functionality
Finally, let's try out this new functionality in the client:
async def statistics_example(user1):
await user1.send_statistics()
statistics_control = user1.listen_for_statistics(
ServerStatisticsRequest(period_seconds=5)
)
await asyncio.sleep(5)
statistics_control.set_requested_statistics(['users'])
await asyncio.sleep(5)
statistics_control.cancel()
Call this new method from the client main
method.