Compare commits

..

2 Commits

Author SHA1 Message Date
1c1ca14fe7 Add types
All checks were successful
continuous-integration/drone/push Build is passing
2023-01-20 04:27:47 +01:00
9986fb5faa Seperate connection management into its own module 2023-01-20 04:17:01 +01:00
3 changed files with 125 additions and 54 deletions

View File

@ -1,40 +1,25 @@
import asyncio
from quart import Quart, Response, websocket
from typing import NoReturn
from glebby.connection_manager import ConnectionManager
from glebby.model import Model
model = Model()
connection_manager = ConnectionManager()
model = Model(connection_manager)
# Files in ./static are served directly under / instead of under /static
app = Quart(__name__, static_url_path='')
async def send_outgoing(queue: asyncio.Queue[str]) -> None:
while True:
data = await queue.get()
await websocket.send(data)
# Root route should point to ./static/index.html
@app.route('/')
async def root() -> Response:
return await app.send_static_file('index.html')
# Every time a websocket connection to /glebby is opened, a new connection
# is set up in the connection manager. It will handle sending and receiving
# and keeping the model up to date.
@app.websocket('/glebby')
async def ws() -> None:
global model
# For each connected client, we create a queue to store its outgoing messages.
# We pass this queue to the model so that it can communicate with the clients.
outgoing_queue: asyncio.Queue[str] = asyncio.Queue()
client_id = await model.add_client(outgoing_queue)
try:
# In order to get the queued messages through the websocket, we need to create a task
# since the current one will be busy receiving incoming messages below.
outgoing_task = asyncio.create_task(send_outgoing(outgoing_queue))
while True:
data = await websocket.receive()
await model.handle_incoming(client_id, data)
# When a client disconnects, we shut down the message pumping task and wait for it to finish
except asyncio.CancelledError:
await model.remove_client(client_id)
outgoing_task.cancel()
await outgoing_task
async def ws() -> NoReturn:
await connection_manager.setup_connection()

View File

@ -0,0 +1,73 @@
import asyncio
import itertools
from collections import OrderedDict
from quart import websocket
from typing import Any, AsyncGenerator, Awaitable, Callable, Iterator, NoReturn, Optional
# Represents a single websocket connection
class Connection:
outgoing_queue: asyncio.Queue[str]
def __init__(self) -> None:
self.outgoing_queue = asyncio.Queue()
# Sending a message simply puts it in the outgoing queue...
async def send(self, data: str) -> None:
await self.outgoing_queue.put(data)
# ... and this infinite loop takes care of actually sending the outgoing
# messages through the websocket connection.
async def _send_loop(self) -> None:
while True:
data = await self.outgoing_queue.get()
await websocket.send(data)
class ConnectionManager:
# Provides a *unique* identifier for each connection
id_generator: Iterator[int]
connections: OrderedDict[int, Connection]
on_open: Optional[Callable[[int], Awaitable[Any]]]
on_message: Optional[Callable[[int, str], Awaitable[Any]]]
on_close: Optional[Callable[[int], Awaitable[Any]]]
def __init__(self) -> None:
self.id_generator = itertools.count(start=0, step=1)
self.connections = OrderedDict()
self.on_open = None
self.on_message = None
self.on_close = None
# Wires up asyncio tasks etc. such that on_open, on_message and on_close
# are called at the right time with the right arguments.
async def setup_connection(self) -> NoReturn:
connection_id = next(self.id_generator)
connection = Connection()
self.connections[connection_id] = connection
if self.on_open:
await self.on_open(connection_id)
try:
send_task = asyncio.create_task(connection._send_loop())
while True:
data = await websocket.receive()
if self.on_message:
await self.on_message(connection_id, data)
except asyncio.CancelledError:
del self.connections[connection_id]
send_task.cancel()
await send_task
if self.on_close:
await self.on_close(connection_id)
raise
# Interface for the model to send messages. The model shouldn't
# use the Connection class at all.
async def send_to(self, connection_id: int, data: str) -> None:
await self.connections[connection_id].send(data)

View File

@ -1,62 +1,60 @@
import asyncio
import itertools
import json
from collections import OrderedDict
from typing import Any, Iterator, Optional
from glebby.board import BoardGenerator
from glebby.connection_manager import ConnectionManager
class Client:
outgoing_queue: asyncio.Queue[str]
id: int
name: str
def __init__(self, client_id: int, outgoing_queue: asyncio.Queue[str]):
self.outgoing_queue = outgoing_queue
def __init__(self, client_id: int):
self.id = client_id
self.name = ''
async def put_outgoing_message(self, message: str) -> None:
await self.outgoing_queue.put(message)
def get_dto(self) -> dict[str, Any]:
return { 'id': self.id, 'name': self.id }
return { 'id': self.id, 'name': self.name }
class Model:
connection_manager: ConnectionManager
clients: OrderedDict[int, Client]
id_generator: Iterator[int]
board_generator: BoardGenerator
board: Optional[list[list[str]]]
def __init__(self) -> None:
def __init__(self, connection_manager: ConnectionManager):
self.connection_manager = connection_manager
self.clients = OrderedDict()
self.id_generator = itertools.count(start=0, step=1)
self.board_generator = BoardGenerator(None)
self.board = None
self.connection_manager.on_open = self.add_client
self.connection_manager.on_message = self.handle_incoming
self.connection_manager.on_close = self.remove_client
async def add_client(self, outgoing_queue: asyncio.Queue[str]) -> int:
client_id = next(self.id_generator)
await self.broadcast(client_id, { 'type': 'join' })
self.clients[client_id] = Client(client_id, outgoing_queue)
print(f'<{client_id}|join>')
async def add_client(self, client_id: int) -> None:
print(f'<{client_id}|OPEN>')
await self.broadcast(client_id, {
'type': 'join'
})
self.clients[client_id] = Client(client_id)
await self.send_to(None, client_id, {
'type': 'init',
'state': self.get_state_dto(client_id)
})
return client_id
async def remove_client(self, client_id: int) -> None:
print(f'<{client_id}|CLOS>')
del self.clients[client_id]
await self.broadcast(client_id, { 'type': 'leave' })
print(f'<{client_id}|leave>')
await self.broadcast(client_id, {
'type': 'leave'
})
async def send_to(self, client_id_from: Optional[int], client_id_to: int, payload: Any) -> None:
await self.clients[client_id_to].put_outgoing_message(json.dumps({
await self.connection_manager.send_to(client_id_to, json.dumps({
'from': client_id_from,
'payload': payload
}))
@ -65,9 +63,24 @@ class Model:
for client_id_to in self.clients:
await self.send_to(client_id_from, client_id_to, payload)
async def handle_incoming(self, client_id: int, message: str) -> None:
print(f'<{client_id}|data> {message}')
payload = json.loads(message)
async def handle_incoming(self, client_id: int, data: str) -> None:
print(f'<{client_id}|DATA> {data}')
try:
payload = json.loads(data)
except:
await self.send_to(None, client_id, {
'type': 'error',
'errorMessage': 'you sent invalid JSON!'
})
return
if 'type' not in payload:
await self.send_to(None, client_id, {
'type': 'error',
'errorMessage': 'missing message type!'
})
return
if payload['type'] == 'set-name':
self.clients[client_id].name = payload['name']