Compare commits
2 Commits
a2a17e5ff9
...
1c1ca14fe7
Author | SHA1 | Date | |
---|---|---|---|
1c1ca14fe7 | |||
9986fb5faa |
@ -1,40 +1,25 @@
|
||||
import asyncio
|
||||
|
||||
from quart import Quart, Response, websocket
|
||||
from typing import NoReturn
|
||||
|
||||
from glebby.connection_manager import ConnectionManager
|
||||
from glebby.model import Model
|
||||
|
||||
model = Model()
|
||||
connection_manager = ConnectionManager()
|
||||
model = Model(connection_manager)
|
||||
|
||||
# Files in ./static are served directly under / instead of under /static
|
||||
app = Quart(__name__, static_url_path='')
|
||||
|
||||
async def send_outgoing(queue: asyncio.Queue[str]) -> None:
|
||||
while True:
|
||||
data = await queue.get()
|
||||
await websocket.send(data)
|
||||
|
||||
# Root route should point to ./static/index.html
|
||||
@app.route('/')
|
||||
async def root() -> Response:
|
||||
return await app.send_static_file('index.html')
|
||||
|
||||
# Every time a websocket connection to /glebby is opened, a new connection
|
||||
# is set up in the connection manager. It will handle sending and receiving
|
||||
# and keeping the model up to date.
|
||||
@app.websocket('/glebby')
|
||||
async def ws() -> None:
|
||||
global model
|
||||
|
||||
# For each connected client, we create a queue to store its outgoing messages.
|
||||
# We pass this queue to the model so that it can communicate with the clients.
|
||||
outgoing_queue: asyncio.Queue[str] = asyncio.Queue()
|
||||
client_id = await model.add_client(outgoing_queue)
|
||||
|
||||
try:
|
||||
# In order to get the queued messages through the websocket, we need to create a task
|
||||
# since the current one will be busy receiving incoming messages below.
|
||||
outgoing_task = asyncio.create_task(send_outgoing(outgoing_queue))
|
||||
|
||||
while True:
|
||||
data = await websocket.receive()
|
||||
await model.handle_incoming(client_id, data)
|
||||
# When a client disconnects, we shut down the message pumping task and wait for it to finish
|
||||
except asyncio.CancelledError:
|
||||
await model.remove_client(client_id)
|
||||
outgoing_task.cancel()
|
||||
await outgoing_task
|
||||
async def ws() -> NoReturn:
|
||||
await connection_manager.setup_connection()
|
||||
|
73
glebby-server/glebby/connection_manager.py
Normal file
73
glebby-server/glebby/connection_manager.py
Normal file
@ -0,0 +1,73 @@
|
||||
import asyncio
|
||||
import itertools
|
||||
|
||||
from collections import OrderedDict
|
||||
from quart import websocket
|
||||
from typing import Any, AsyncGenerator, Awaitable, Callable, Iterator, NoReturn, Optional
|
||||
|
||||
# Represents a single websocket connection
|
||||
class Connection:
|
||||
outgoing_queue: asyncio.Queue[str]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.outgoing_queue = asyncio.Queue()
|
||||
|
||||
# Sending a message simply puts it in the outgoing queue...
|
||||
async def send(self, data: str) -> None:
|
||||
await self.outgoing_queue.put(data)
|
||||
|
||||
# ... and this infinite loop takes care of actually sending the outgoing
|
||||
# messages through the websocket connection.
|
||||
async def _send_loop(self) -> None:
|
||||
while True:
|
||||
data = await self.outgoing_queue.get()
|
||||
await websocket.send(data)
|
||||
|
||||
class ConnectionManager:
|
||||
# Provides a *unique* identifier for each connection
|
||||
id_generator: Iterator[int]
|
||||
connections: OrderedDict[int, Connection]
|
||||
|
||||
on_open: Optional[Callable[[int], Awaitable[Any]]]
|
||||
on_message: Optional[Callable[[int, str], Awaitable[Any]]]
|
||||
on_close: Optional[Callable[[int], Awaitable[Any]]]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.id_generator = itertools.count(start=0, step=1)
|
||||
self.connections = OrderedDict()
|
||||
|
||||
self.on_open = None
|
||||
self.on_message = None
|
||||
self.on_close = None
|
||||
|
||||
# Wires up asyncio tasks etc. such that on_open, on_message and on_close
|
||||
# are called at the right time with the right arguments.
|
||||
async def setup_connection(self) -> NoReturn:
|
||||
connection_id = next(self.id_generator)
|
||||
connection = Connection()
|
||||
self.connections[connection_id] = connection
|
||||
|
||||
if self.on_open:
|
||||
await self.on_open(connection_id)
|
||||
|
||||
try:
|
||||
send_task = asyncio.create_task(connection._send_loop())
|
||||
while True:
|
||||
data = await websocket.receive()
|
||||
if self.on_message:
|
||||
await self.on_message(connection_id, data)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
del self.connections[connection_id]
|
||||
send_task.cancel()
|
||||
await send_task
|
||||
|
||||
if self.on_close:
|
||||
await self.on_close(connection_id)
|
||||
|
||||
raise
|
||||
|
||||
# Interface for the model to send messages. The model shouldn't
|
||||
# use the Connection class at all.
|
||||
async def send_to(self, connection_id: int, data: str) -> None:
|
||||
await self.connections[connection_id].send(data)
|
@ -1,62 +1,60 @@
|
||||
import asyncio
|
||||
import itertools
|
||||
import json
|
||||
|
||||
from collections import OrderedDict
|
||||
from typing import Any, Iterator, Optional
|
||||
|
||||
from glebby.board import BoardGenerator
|
||||
from glebby.connection_manager import ConnectionManager
|
||||
|
||||
class Client:
|
||||
outgoing_queue: asyncio.Queue[str]
|
||||
id: int
|
||||
name: str
|
||||
|
||||
def __init__(self, client_id: int, outgoing_queue: asyncio.Queue[str]):
|
||||
self.outgoing_queue = outgoing_queue
|
||||
def __init__(self, client_id: int):
|
||||
self.id = client_id
|
||||
self.name = ''
|
||||
|
||||
async def put_outgoing_message(self, message: str) -> None:
|
||||
await self.outgoing_queue.put(message)
|
||||
|
||||
def get_dto(self) -> dict[str, Any]:
|
||||
return { 'id': self.id, 'name': self.id }
|
||||
return { 'id': self.id, 'name': self.name }
|
||||
|
||||
class Model:
|
||||
connection_manager: ConnectionManager
|
||||
clients: OrderedDict[int, Client]
|
||||
id_generator: Iterator[int]
|
||||
board_generator: BoardGenerator
|
||||
board: Optional[list[list[str]]]
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, connection_manager: ConnectionManager):
|
||||
self.connection_manager = connection_manager
|
||||
self.clients = OrderedDict()
|
||||
self.id_generator = itertools.count(start=0, step=1)
|
||||
self.board_generator = BoardGenerator(None)
|
||||
self.board = None
|
||||
|
||||
self.connection_manager.on_open = self.add_client
|
||||
self.connection_manager.on_message = self.handle_incoming
|
||||
self.connection_manager.on_close = self.remove_client
|
||||
|
||||
async def add_client(self, outgoing_queue: asyncio.Queue[str]) -> int:
|
||||
client_id = next(self.id_generator)
|
||||
await self.broadcast(client_id, { 'type': 'join' })
|
||||
|
||||
self.clients[client_id] = Client(client_id, outgoing_queue)
|
||||
print(f'<{client_id}|join>')
|
||||
async def add_client(self, client_id: int) -> None:
|
||||
print(f'<{client_id}|OPEN>')
|
||||
|
||||
await self.broadcast(client_id, {
|
||||
'type': 'join'
|
||||
})
|
||||
self.clients[client_id] = Client(client_id)
|
||||
await self.send_to(None, client_id, {
|
||||
'type': 'init',
|
||||
'state': self.get_state_dto(client_id)
|
||||
})
|
||||
|
||||
return client_id
|
||||
|
||||
async def remove_client(self, client_id: int) -> None:
|
||||
print(f'<{client_id}|CLOS>')
|
||||
|
||||
del self.clients[client_id]
|
||||
await self.broadcast(client_id, { 'type': 'leave' })
|
||||
|
||||
print(f'<{client_id}|leave>')
|
||||
await self.broadcast(client_id, {
|
||||
'type': 'leave'
|
||||
})
|
||||
|
||||
async def send_to(self, client_id_from: Optional[int], client_id_to: int, payload: Any) -> None:
|
||||
await self.clients[client_id_to].put_outgoing_message(json.dumps({
|
||||
await self.connection_manager.send_to(client_id_to, json.dumps({
|
||||
'from': client_id_from,
|
||||
'payload': payload
|
||||
}))
|
||||
@ -65,9 +63,24 @@ class Model:
|
||||
for client_id_to in self.clients:
|
||||
await self.send_to(client_id_from, client_id_to, payload)
|
||||
|
||||
async def handle_incoming(self, client_id: int, message: str) -> None:
|
||||
print(f'<{client_id}|data> {message}')
|
||||
payload = json.loads(message)
|
||||
async def handle_incoming(self, client_id: int, data: str) -> None:
|
||||
print(f'<{client_id}|DATA> {data}')
|
||||
|
||||
try:
|
||||
payload = json.loads(data)
|
||||
except:
|
||||
await self.send_to(None, client_id, {
|
||||
'type': 'error',
|
||||
'errorMessage': 'you sent invalid JSON!'
|
||||
})
|
||||
return
|
||||
|
||||
if 'type' not in payload:
|
||||
await self.send_to(None, client_id, {
|
||||
'type': 'error',
|
||||
'errorMessage': 'missing message type!'
|
||||
})
|
||||
return
|
||||
|
||||
if payload['type'] == 'set-name':
|
||||
self.clients[client_id].name = payload['name']
|
||||
|
Loading…
x
Reference in New Issue
Block a user