This commit is contained in:
parent
9986fb5faa
commit
1c1ca14fe7
@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
|
||||
from quart import Quart, Response, websocket
|
||||
from typing import NoReturn
|
||||
|
||||
from glebby.connection_manager import ConnectionManager
|
||||
from glebby.model import Model
|
||||
@ -20,5 +21,5 @@ async def root() -> Response:
|
||||
# is set up in the connection manager. It will handle sending and receiving
|
||||
# and keeping the model up to date.
|
||||
@app.websocket('/glebby')
|
||||
async def ws() -> None:
|
||||
async def ws() -> NoReturn:
|
||||
await connection_manager.setup_connection()
|
||||
|
@ -3,17 +3,17 @@ import itertools
|
||||
|
||||
from collections import OrderedDict
|
||||
from quart import websocket
|
||||
from typing import AsyncGenerator, Iterator, Optional
|
||||
from typing import Any, AsyncGenerator, Awaitable, Callable, Iterator, NoReturn, Optional
|
||||
|
||||
# Represents a single websocket connection
|
||||
class Connection:
|
||||
outgoing_queue: asyncio.Queue[str]
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self.outgoing_queue = asyncio.Queue()
|
||||
|
||||
# Sending a message simply puts it in the outgoing queue...
|
||||
async def send(self, data: str):
|
||||
async def send(self, data: str) -> None:
|
||||
await self.outgoing_queue.put(data)
|
||||
|
||||
# ... and this infinite loop takes care of actually sending the outgoing
|
||||
@ -28,7 +28,11 @@ class ConnectionManager:
|
||||
id_generator: Iterator[int]
|
||||
connections: OrderedDict[int, Connection]
|
||||
|
||||
def __init__(self):
|
||||
on_open: Optional[Callable[[int], Awaitable[Any]]]
|
||||
on_message: Optional[Callable[[int, str], Awaitable[Any]]]
|
||||
on_close: Optional[Callable[[int], Awaitable[Any]]]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.id_generator = itertools.count(start=0, step=1)
|
||||
self.connections = OrderedDict()
|
||||
|
||||
@ -38,25 +42,30 @@ class ConnectionManager:
|
||||
|
||||
# Wires up asyncio tasks etc. such that on_open, on_message and on_close
|
||||
# are called at the right time with the right arguments.
|
||||
async def setup_connection(self):
|
||||
async def setup_connection(self) -> NoReturn:
|
||||
connection_id = next(self.id_generator)
|
||||
connection = Connection()
|
||||
self.connections[connection_id] = connection
|
||||
|
||||
await self.on_open(connection_id)
|
||||
if self.on_open:
|
||||
await self.on_open(connection_id)
|
||||
|
||||
try:
|
||||
send_task = asyncio.create_task(connection._send_loop())
|
||||
while True:
|
||||
data = await websocket.receive()
|
||||
await self.on_message(connection_id, data)
|
||||
if self.on_message:
|
||||
await self.on_message(connection_id, data)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
del self.connections[connection_id]
|
||||
send_task.cancel()
|
||||
await send_task
|
||||
|
||||
await self.on_close(connection_id)
|
||||
if self.on_close:
|
||||
await self.on_close(connection_id)
|
||||
|
||||
raise
|
||||
|
||||
# Interface for the model to send messages. The model shouldn't
|
||||
# use the Connection class at all.
|
||||
|
Loading…
x
Reference in New Issue
Block a user