Compare commits

..

2 Commits

Author SHA1 Message Date
1c1ca14fe7 Add types
All checks were successful
continuous-integration/drone/push Build is passing
2023-01-20 04:27:47 +01:00
9986fb5faa Seperate connection management into its own module 2023-01-20 04:17:01 +01:00
3 changed files with 125 additions and 54 deletions

View File

@ -1,40 +1,25 @@
import asyncio import asyncio
from quart import Quart, Response, websocket from quart import Quart, Response, websocket
from typing import NoReturn
from glebby.connection_manager import ConnectionManager
from glebby.model import Model from glebby.model import Model
model = Model() connection_manager = ConnectionManager()
model = Model(connection_manager)
# Files in ./static are served directly under / instead of under /static
app = Quart(__name__, static_url_path='') app = Quart(__name__, static_url_path='')
async def send_outgoing(queue: asyncio.Queue[str]) -> None: # Root route should point to ./static/index.html
while True:
data = await queue.get()
await websocket.send(data)
@app.route('/') @app.route('/')
async def root() -> Response: async def root() -> Response:
return await app.send_static_file('index.html') return await app.send_static_file('index.html')
# Every time a websocket connection to /glebby is opened, a new connection
# is set up in the connection manager. It will handle sending and receiving
# and keeping the model up to date.
@app.websocket('/glebby') @app.websocket('/glebby')
async def ws() -> None: async def ws() -> NoReturn:
global model await connection_manager.setup_connection()
# For each connected client, we create a queue to store its outgoing messages.
# We pass this queue to the model so that it can communicate with the clients.
outgoing_queue: asyncio.Queue[str] = asyncio.Queue()
client_id = await model.add_client(outgoing_queue)
try:
# In order to get the queued messages through the websocket, we need to create a task
# since the current one will be busy receiving incoming messages below.
outgoing_task = asyncio.create_task(send_outgoing(outgoing_queue))
while True:
data = await websocket.receive()
await model.handle_incoming(client_id, data)
# When a client disconnects, we shut down the message pumping task and wait for it to finish
except asyncio.CancelledError:
await model.remove_client(client_id)
outgoing_task.cancel()
await outgoing_task

View File

@ -0,0 +1,73 @@
import asyncio
import itertools
from collections import OrderedDict
from quart import websocket
from typing import Any, AsyncGenerator, Awaitable, Callable, Iterator, NoReturn, Optional
# Represents a single websocket connection
class Connection:
outgoing_queue: asyncio.Queue[str]
def __init__(self) -> None:
self.outgoing_queue = asyncio.Queue()
# Sending a message simply puts it in the outgoing queue...
async def send(self, data: str) -> None:
await self.outgoing_queue.put(data)
# ... and this infinite loop takes care of actually sending the outgoing
# messages through the websocket connection.
async def _send_loop(self) -> None:
while True:
data = await self.outgoing_queue.get()
await websocket.send(data)
class ConnectionManager:
# Provides a *unique* identifier for each connection
id_generator: Iterator[int]
connections: OrderedDict[int, Connection]
on_open: Optional[Callable[[int], Awaitable[Any]]]
on_message: Optional[Callable[[int, str], Awaitable[Any]]]
on_close: Optional[Callable[[int], Awaitable[Any]]]
def __init__(self) -> None:
self.id_generator = itertools.count(start=0, step=1)
self.connections = OrderedDict()
self.on_open = None
self.on_message = None
self.on_close = None
# Wires up asyncio tasks etc. such that on_open, on_message and on_close
# are called at the right time with the right arguments.
async def setup_connection(self) -> NoReturn:
connection_id = next(self.id_generator)
connection = Connection()
self.connections[connection_id] = connection
if self.on_open:
await self.on_open(connection_id)
try:
send_task = asyncio.create_task(connection._send_loop())
while True:
data = await websocket.receive()
if self.on_message:
await self.on_message(connection_id, data)
except asyncio.CancelledError:
del self.connections[connection_id]
send_task.cancel()
await send_task
if self.on_close:
await self.on_close(connection_id)
raise
# Interface for the model to send messages. The model shouldn't
# use the Connection class at all.
async def send_to(self, connection_id: int, data: str) -> None:
await self.connections[connection_id].send(data)

View File

@ -1,62 +1,60 @@
import asyncio
import itertools
import json import json
from collections import OrderedDict from collections import OrderedDict
from typing import Any, Iterator, Optional from typing import Any, Iterator, Optional
from glebby.board import BoardGenerator from glebby.board import BoardGenerator
from glebby.connection_manager import ConnectionManager
class Client: class Client:
outgoing_queue: asyncio.Queue[str]
id: int id: int
name: str name: str
def __init__(self, client_id: int, outgoing_queue: asyncio.Queue[str]): def __init__(self, client_id: int):
self.outgoing_queue = outgoing_queue
self.id = client_id self.id = client_id
self.name = '' self.name = ''
async def put_outgoing_message(self, message: str) -> None:
await self.outgoing_queue.put(message)
def get_dto(self) -> dict[str, Any]: def get_dto(self) -> dict[str, Any]:
return { 'id': self.id, 'name': self.id } return { 'id': self.id, 'name': self.name }
class Model: class Model:
connection_manager: ConnectionManager
clients: OrderedDict[int, Client] clients: OrderedDict[int, Client]
id_generator: Iterator[int]
board_generator: BoardGenerator board_generator: BoardGenerator
board: Optional[list[list[str]]] board: Optional[list[list[str]]]
def __init__(self) -> None: def __init__(self, connection_manager: ConnectionManager):
self.connection_manager = connection_manager
self.clients = OrderedDict() self.clients = OrderedDict()
self.id_generator = itertools.count(start=0, step=1)
self.board_generator = BoardGenerator(None) self.board_generator = BoardGenerator(None)
self.board = None self.board = None
async def add_client(self, outgoing_queue: asyncio.Queue[str]) -> int: self.connection_manager.on_open = self.add_client
client_id = next(self.id_generator) self.connection_manager.on_message = self.handle_incoming
await self.broadcast(client_id, { 'type': 'join' }) self.connection_manager.on_close = self.remove_client
self.clients[client_id] = Client(client_id, outgoing_queue) async def add_client(self, client_id: int) -> None:
print(f'<{client_id}|join>') print(f'<{client_id}|OPEN>')
await self.broadcast(client_id, {
'type': 'join'
})
self.clients[client_id] = Client(client_id)
await self.send_to(None, client_id, { await self.send_to(None, client_id, {
'type': 'init', 'type': 'init',
'state': self.get_state_dto(client_id) 'state': self.get_state_dto(client_id)
}) })
return client_id
async def remove_client(self, client_id: int) -> None: async def remove_client(self, client_id: int) -> None:
del self.clients[client_id] print(f'<{client_id}|CLOS>')
await self.broadcast(client_id, { 'type': 'leave' })
print(f'<{client_id}|leave>') del self.clients[client_id]
await self.broadcast(client_id, {
'type': 'leave'
})
async def send_to(self, client_id_from: Optional[int], client_id_to: int, payload: Any) -> None: async def send_to(self, client_id_from: Optional[int], client_id_to: int, payload: Any) -> None:
await self.clients[client_id_to].put_outgoing_message(json.dumps({ await self.connection_manager.send_to(client_id_to, json.dumps({
'from': client_id_from, 'from': client_id_from,
'payload': payload 'payload': payload
})) }))
@ -65,9 +63,24 @@ class Model:
for client_id_to in self.clients: for client_id_to in self.clients:
await self.send_to(client_id_from, client_id_to, payload) await self.send_to(client_id_from, client_id_to, payload)
async def handle_incoming(self, client_id: int, message: str) -> None: async def handle_incoming(self, client_id: int, data: str) -> None:
print(f'<{client_id}|data> {message}') print(f'<{client_id}|DATA> {data}')
payload = json.loads(message)
try:
payload = json.loads(data)
except:
await self.send_to(None, client_id, {
'type': 'error',
'errorMessage': 'you sent invalid JSON!'
})
return
if 'type' not in payload:
await self.send_to(None, client_id, {
'type': 'error',
'errorMessage': 'missing message type!'
})
return
if payload['type'] == 'set-name': if payload['type'] == 'set-name':
self.clients[client_id].name = payload['name'] self.clients[client_id].name = payload['name']