Coverage for backend/dependencies.py: 89%

80 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-05-02 01:42 +0000

1import asyncio 

2import contextlib 

3import itertools 

4import queue 

5import random 

6from collections.abc import AsyncGenerator, Iterable 

7from functools import cache 

8 

9from fastapi import WebSocket, WebSocketDisconnect 

10 

11from .constants import Settings 

12from .game import BURGER_INGREDIENTS, start_main_loop 

13from .game_state import Lobby, LobbyClosedError, LobbyFullError, LobbyNotFoundError, Player, TaggedMessage 

14from .models import Message, PlayerCount 

15 

16 

17class Channel[S, R]: 

18 """Wrapper class around two queues for two-way communication.""" 

19 

20 def __init__(self, send: queue.Queue[S], recv: queue.Queue[R]) -> None: 

21 self._send = send 

22 self._recv = recv 

23 

24 def send(self, msg: S) -> None: 

25 """Send a message.""" 

26 # Can't raise queue.Full because we don't set a max size. 

27 self._send.put_nowait(msg) 

28 

29 def recv_nowait(self) -> R | None: 

30 """Receive a message or None if empty.""" 

31 return self._recv.get() 

32 

33 def recv(self) -> R: 

34 """Receive a message. Blocks until a message is available.""" 

35 return self._recv.get() 

36 

37 async def arecv(self) -> R: 

38 """Receive a message.""" 

39 while True: 

40 with contextlib.suppress(queue.Empty): 

41 return self._recv.get_nowait() 

42 

43 await asyncio.sleep(0.05) 

44 

45 

46@cache 

47def settings() -> Settings: 

48 """Return the app settings.""" 

49 return Settings() 

50 

51 

52class LobbyManager: 

53 """Handle creation of lobbies and adding players to lobbies.""" 

54 

55 def __init__(self, codes: Iterable[str]) -> None: 

56 self.lobbies: dict[tuple[str, ...], Lobby] = {} 

57 all_codes = list(itertools.product(codes, repeat=settings().code_length)) 

58 random.shuffle(all_codes) 

59 self.available_codes = all_codes 

60 

61 def register_player(self, code: tuple[str, ...]) -> str: 

62 """ 

63 Add a player to a lobby given a lobby join code. 

64 

65 Args: 

66 code: The lobby join code. 

67 

68 Raises: 

69 LobbyNotFound: The lobby does not exist. 

70 LobbyFull: The lobby is full. 

71 LobbyClosedError: The game has already started. 

72 

73 Returns: 

74 The id of the newly created player. 

75 """ 

76 try: 

77 lobby = self.lobbies[code] 

78 except KeyError: 

79 raise LobbyNotFoundError(f"Code {code} is not associated with any existing lobbies!") 

80 

81 if len(lobby.players) == 4: 

82 raise LobbyFullError("Lobby is full.") 

83 

84 if not lobby.open: 

85 raise LobbyClosedError("Lobby has already started.") 

86 

87 channel = queue.Queue() 

88 player = Player(channel=channel, role=None) 

89 lobby.players[player.id] = player 

90 lobby.broadcast(Message(data=PlayerCount(count=len(lobby.players), player_ids=list(lobby.players.keys())))) 

91 

92 return player.id 

93 

94 def register_lobby(self) -> tuple[str, ...]: 

95 """ 

96 Create a new lobby in its own thread. 

97 

98 Returns: 

99 The lobby's join code 

100 

101 Raises: 

102 RuntimeError: If no more codes are available 

103 """ 

104 if not self.available_codes: 

105 raise RuntimeError("No more lobby codes available, server is full") 

106 

107 code = self.available_codes.pop() 

108 lobby = Lobby(code) 

109 

110 self.lobbies[code] = lobby 

111 

112 return code 

113 

114 def delete_lobby(self, code: tuple[str, ...]) -> None: 

115 """ 

116 Delete a lobby and recycle its code. 

117 

118 Args: 

119 code: The lobby join code. 

120 """ 

121 if code in self.lobbies: 

122 del self.lobbies[code] 

123 self.available_codes.append(code) 

124 

125 def channel(self, code: tuple[str, ...], id: str) -> Channel[TaggedMessage, Message]: 

126 """ 

127 Create a channel for sending and receiving messages to and from the lobby. 

128 

129 Args: 

130 code: Lobby join code. 

131 id: Player id. 

132 """ 

133 lobby = self.lobbies[code] 

134 

135 if not lobby.loop_started: 

136 lobby.loop_started = True 

137 start_main_loop(lobby) 

138 

139 channel = Channel(lobby.channel, lobby.players[id].channel) 

140 return channel 

141 

142 

143_LobbyManager = LobbyManager(BURGER_INGREDIENTS) 

144 

145 

146def lobby_manager() -> LobbyManager: 

147 """Return the lobby manager dependency.""" 

148 return _LobbyManager 

149 

150 

151@contextlib.asynccontextmanager 

152async def connection_manager(websocket: WebSocket) -> AsyncGenerator[WebSocket, None]: 

153 """Handle websocket connect/disconnect.""" 

154 await websocket.accept() 

155 

156 try: 

157 yield websocket 

158 except WebSocketDisconnect: 

159 pass 

160 except Exception: 

161 await websocket.close()