Coverage for backend/dependencies.py: 89%
80 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-02 01:42 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-02 01:42 +0000
1import asyncio
2import contextlib
3import itertools
4import queue
5import random
6from collections.abc import AsyncGenerator, Iterable
7from functools import cache
9from fastapi import WebSocket, WebSocketDisconnect
11from .constants import Settings
12from .game import BURGER_INGREDIENTS, start_main_loop
13from .game_state import Lobby, LobbyClosedError, LobbyFullError, LobbyNotFoundError, Player, TaggedMessage
14from .models import Message, PlayerCount
17class Channel[S, R]:
18 """Wrapper class around two queues for two-way communication."""
20 def __init__(self, send: queue.Queue[S], recv: queue.Queue[R]) -> None:
21 self._send = send
22 self._recv = recv
24 def send(self, msg: S) -> None:
25 """Send a message."""
26 # Can't raise queue.Full because we don't set a max size.
27 self._send.put_nowait(msg)
29 def recv_nowait(self) -> R | None:
30 """Receive a message or None if empty."""
31 return self._recv.get()
33 def recv(self) -> R:
34 """Receive a message. Blocks until a message is available."""
35 return self._recv.get()
37 async def arecv(self) -> R:
38 """Receive a message."""
39 while True:
40 with contextlib.suppress(queue.Empty):
41 return self._recv.get_nowait()
43 await asyncio.sleep(0.05)
46@cache
47def settings() -> Settings:
48 """Return the app settings."""
49 return Settings()
52class LobbyManager:
53 """Handle creation of lobbies and adding players to lobbies."""
55 def __init__(self, codes: Iterable[str]) -> None:
56 self.lobbies: dict[tuple[str, ...], Lobby] = {}
57 all_codes = list(itertools.product(codes, repeat=settings().code_length))
58 random.shuffle(all_codes)
59 self.available_codes = all_codes
61 def register_player(self, code: tuple[str, ...]) -> str:
62 """
63 Add a player to a lobby given a lobby join code.
65 Args:
66 code: The lobby join code.
68 Raises:
69 LobbyNotFound: The lobby does not exist.
70 LobbyFull: The lobby is full.
71 LobbyClosedError: The game has already started.
73 Returns:
74 The id of the newly created player.
75 """
76 try:
77 lobby = self.lobbies[code]
78 except KeyError:
79 raise LobbyNotFoundError(f"Code {code} is not associated with any existing lobbies!")
81 if len(lobby.players) == 4:
82 raise LobbyFullError("Lobby is full.")
84 if not lobby.open:
85 raise LobbyClosedError("Lobby has already started.")
87 channel = queue.Queue()
88 player = Player(channel=channel, role=None)
89 lobby.players[player.id] = player
90 lobby.broadcast(Message(data=PlayerCount(count=len(lobby.players), player_ids=list(lobby.players.keys()))))
92 return player.id
94 def register_lobby(self) -> tuple[str, ...]:
95 """
96 Create a new lobby in its own thread.
98 Returns:
99 The lobby's join code
101 Raises:
102 RuntimeError: If no more codes are available
103 """
104 if not self.available_codes:
105 raise RuntimeError("No more lobby codes available, server is full")
107 code = self.available_codes.pop()
108 lobby = Lobby(code)
110 self.lobbies[code] = lobby
112 return code
114 def delete_lobby(self, code: tuple[str, ...]) -> None:
115 """
116 Delete a lobby and recycle its code.
118 Args:
119 code: The lobby join code.
120 """
121 if code in self.lobbies:
122 del self.lobbies[code]
123 self.available_codes.append(code)
125 def channel(self, code: tuple[str, ...], id: str) -> Channel[TaggedMessage, Message]:
126 """
127 Create a channel for sending and receiving messages to and from the lobby.
129 Args:
130 code: Lobby join code.
131 id: Player id.
132 """
133 lobby = self.lobbies[code]
135 if not lobby.loop_started:
136 lobby.loop_started = True
137 start_main_loop(lobby)
139 channel = Channel(lobby.channel, lobby.players[id].channel)
140 return channel
143_LobbyManager = LobbyManager(BURGER_INGREDIENTS)
146def lobby_manager() -> LobbyManager:
147 """Return the lobby manager dependency."""
148 return _LobbyManager
151@contextlib.asynccontextmanager
152async def connection_manager(websocket: WebSocket) -> AsyncGenerator[WebSocket, None]:
153 """Handle websocket connect/disconnect."""
154 await websocket.accept()
156 try:
157 yield websocket
158 except WebSocketDisconnect:
159 pass
160 except Exception:
161 await websocket.close()