From 9986fb5faabc6ffa27ed34c214ca2d132b2d132f Mon Sep 17 00:00:00 2001 From: Paul Brinkmeier Date: Fri, 20 Jan 2023 04:17:01 +0100 Subject: [PATCH] Seperate connection management into its own module --- glebby-server/glebby/__init__.py | 36 ++++-------- glebby-server/glebby/connection_manager.py | 64 +++++++++++++++++++++ glebby-server/glebby/model.py | 67 +++++++++++++--------- 3 files changed, 114 insertions(+), 53 deletions(-) create mode 100644 glebby-server/glebby/connection_manager.py diff --git a/glebby-server/glebby/__init__.py b/glebby-server/glebby/__init__.py index 2685f47..7f72fb1 100644 --- a/glebby-server/glebby/__init__.py +++ b/glebby-server/glebby/__init__.py @@ -2,39 +2,23 @@ import asyncio from quart import Quart, Response, websocket +from glebby.connection_manager import ConnectionManager from glebby.model import Model -model = Model() +connection_manager = ConnectionManager() +model = Model(connection_manager) + +# Files in ./static are served directly under / instead of under /static app = Quart(__name__, static_url_path='') -async def send_outgoing(queue: asyncio.Queue[str]) -> None: - while True: - data = await queue.get() - await websocket.send(data) - +# Root route should point to ./static/index.html @app.route('/') async def root() -> Response: return await app.send_static_file('index.html') +# Every time a websocket connection to /glebby is opened, a new connection +# is set up in the connection manager. It will handle sending and receiving +# and keeping the model up to date. @app.websocket('/glebby') async def ws() -> None: - global model - - # For each connected client, we create a queue to store its outgoing messages. - # We pass this queue to the model so that it can communicate with the clients. - outgoing_queue: asyncio.Queue[str] = asyncio.Queue() - client_id = await model.add_client(outgoing_queue) - - try: - # In order to get the queued messages through the websocket, we need to create a task - # since the current one will be busy receiving incoming messages below. - outgoing_task = asyncio.create_task(send_outgoing(outgoing_queue)) - - while True: - data = await websocket.receive() - await model.handle_incoming(client_id, data) - # When a client disconnects, we shut down the message pumping task and wait for it to finish - except asyncio.CancelledError: - await model.remove_client(client_id) - outgoing_task.cancel() - await outgoing_task + await connection_manager.setup_connection() diff --git a/glebby-server/glebby/connection_manager.py b/glebby-server/glebby/connection_manager.py new file mode 100644 index 0000000..821dc1a --- /dev/null +++ b/glebby-server/glebby/connection_manager.py @@ -0,0 +1,64 @@ +import asyncio +import itertools + +from collections import OrderedDict +from quart import websocket +from typing import AsyncGenerator, Iterator, Optional + +# Represents a single websocket connection +class Connection: + outgoing_queue: asyncio.Queue[str] + + def __init__(self): + self.outgoing_queue = asyncio.Queue() + + # Sending a message simply puts it in the outgoing queue... + async def send(self, data: str): + await self.outgoing_queue.put(data) + + # ... and this infinite loop takes care of actually sending the outgoing + # messages through the websocket connection. + async def _send_loop(self) -> None: + while True: + data = await self.outgoing_queue.get() + await websocket.send(data) + +class ConnectionManager: + # Provides a *unique* identifier for each connection + id_generator: Iterator[int] + connections: OrderedDict[int, Connection] + + def __init__(self): + self.id_generator = itertools.count(start=0, step=1) + self.connections = OrderedDict() + + self.on_open = None + self.on_message = None + self.on_close = None + + # Wires up asyncio tasks etc. such that on_open, on_message and on_close + # are called at the right time with the right arguments. + async def setup_connection(self): + connection_id = next(self.id_generator) + connection = Connection() + self.connections[connection_id] = connection + + await self.on_open(connection_id) + + try: + send_task = asyncio.create_task(connection._send_loop()) + while True: + data = await websocket.receive() + await self.on_message(connection_id, data) + + except asyncio.CancelledError: + del self.connections[connection_id] + send_task.cancel() + await send_task + + await self.on_close(connection_id) + + # Interface for the model to send messages. The model shouldn't + # use the Connection class at all. + async def send_to(self, connection_id: int, data: str) -> None: + await self.connections[connection_id].send(data) diff --git a/glebby-server/glebby/model.py b/glebby-server/glebby/model.py index 253fab4..cb9de55 100644 --- a/glebby-server/glebby/model.py +++ b/glebby-server/glebby/model.py @@ -1,62 +1,60 @@ -import asyncio -import itertools import json from collections import OrderedDict from typing import Any, Iterator, Optional from glebby.board import BoardGenerator +from glebby.connection_manager import ConnectionManager class Client: - outgoing_queue: asyncio.Queue[str] id: int name: str - def __init__(self, client_id: int, outgoing_queue: asyncio.Queue[str]): - self.outgoing_queue = outgoing_queue + def __init__(self, client_id: int): self.id = client_id self.name = '' - async def put_outgoing_message(self, message: str) -> None: - await self.outgoing_queue.put(message) - def get_dto(self) -> dict[str, Any]: - return { 'id': self.id, 'name': self.id } + return { 'id': self.id, 'name': self.name } class Model: + connection_manager: ConnectionManager clients: OrderedDict[int, Client] - id_generator: Iterator[int] board_generator: BoardGenerator board: Optional[list[list[str]]] - def __init__(self) -> None: + def __init__(self, connection_manager: ConnectionManager): + self.connection_manager = connection_manager self.clients = OrderedDict() - self.id_generator = itertools.count(start=0, step=1) self.board_generator = BoardGenerator(None) self.board = None + + self.connection_manager.on_open = self.add_client + self.connection_manager.on_message = self.handle_incoming + self.connection_manager.on_close = self.remove_client - async def add_client(self, outgoing_queue: asyncio.Queue[str]) -> int: - client_id = next(self.id_generator) - await self.broadcast(client_id, { 'type': 'join' }) - - self.clients[client_id] = Client(client_id, outgoing_queue) - print(f'<{client_id}|join>') + async def add_client(self, client_id: int) -> None: + print(f'<{client_id}|OPEN>') + await self.broadcast(client_id, { + 'type': 'join' + }) + self.clients[client_id] = Client(client_id) await self.send_to(None, client_id, { 'type': 'init', 'state': self.get_state_dto(client_id) }) - return client_id - async def remove_client(self, client_id: int) -> None: + print(f'<{client_id}|CLOS>') + del self.clients[client_id] - await self.broadcast(client_id, { 'type': 'leave' }) - - print(f'<{client_id}|leave>') + await self.broadcast(client_id, { + 'type': 'leave' + }) async def send_to(self, client_id_from: Optional[int], client_id_to: int, payload: Any) -> None: - await self.clients[client_id_to].put_outgoing_message(json.dumps({ + await self.connection_manager.send_to(client_id_to, json.dumps({ 'from': client_id_from, 'payload': payload })) @@ -65,9 +63,24 @@ class Model: for client_id_to in self.clients: await self.send_to(client_id_from, client_id_to, payload) - async def handle_incoming(self, client_id: int, message: str) -> None: - print(f'<{client_id}|data> {message}') - payload = json.loads(message) + async def handle_incoming(self, client_id: int, data: str) -> None: + print(f'<{client_id}|DATA> {data}') + + try: + payload = json.loads(data) + except: + await self.send_to(None, client_id, { + 'type': 'error', + 'errorMessage': 'you sent invalid JSON!' + }) + return + + if 'type' not in payload: + await self.send_to(None, client_id, { + 'type': 'error', + 'errorMessage': 'missing message type!' + }) + return if payload['type'] == 'set-name': self.clients[client_id].name = payload['name']