Coverage for backend/dependencies.py: 71%

70 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-17 17:55 +0000

1import asyncio 

2import contextlib 

3import itertools 

4import queue 

5import random 

6from collections.abc import Iterable 

7from functools import cache 

8 

9from .constants import Settings 

10from .game import BURGER_INGREDIENTS, start_main_loop 

11from .game_state import Lobby, LobbyClosedError, LobbyFullError, LobbyNotFoundError, Player, TaggedMessage 

12from .models import Message, PlayerCount 

13 

14 

15class Channel[S, R]: 

16 """Wrapper class around two queues for two-way communication.""" 

17 

18 def __init__(self, send: queue.Queue[S], recv: queue.Queue[R]) -> None: 

19 self._send = send 

20 self._recv = recv 

21 

22 def send(self, msg: S) -> None: 

23 """Send a message.""" 

24 # Can't raise queue.Full because we don't set a max size. 

25 self._send.put_nowait(msg) 

26 

27 def recv_nowait(self) -> R | None: 

28 """Receive a message or None if empty.""" 

29 return self._recv.get() 

30 

31 def recv(self) -> R: 

32 """Receive a message. Blocks until a message is available.""" 

33 return self._recv.get() 

34 

35 async def arecv(self) -> R: 

36 """Receive a message.""" 

37 while True: 

38 with contextlib.suppress(queue.Empty): 

39 return self._recv.get_nowait() 

40 

41 await asyncio.sleep(0.05) 

42 

43 

44@cache 

45def settings() -> Settings: 

46 """Return the app settings.""" 

47 return Settings() 

48 

49 

50class LobbyManager: 

51 """Handle creation of lobbies and adding players to lobbies.""" 

52 

53 def __init__(self, codes: Iterable[str]) -> None: 

54 self.lobbies: dict[tuple[str, ...], Lobby] = {} 

55 all_codes = list(itertools.product(codes, repeat=settings().code_length)) 

56 random.shuffle(all_codes) 

57 self.available_codes = all_codes 

58 

59 def register_player(self, code: tuple[str, ...]) -> str: 

60 """ 

61 Add a player to a lobby given a lobby join code. 

62 

63 Args: 

64 code: The lobby join code. 

65 

66 Raises: 

67 LobbyNotFound: The lobby does not exist. 

68 LobbyFull: The lobby is full. 

69 

70 Returns: 

71 The id of the newly created player. 

72 """ 

73 try: 

74 lobby = self.lobbies[code] 

75 except KeyError: 

76 raise LobbyNotFoundError(f"Code {code} is not associated with any existing lobbies!") 

77 

78 if len(lobby.players) == 4: 

79 raise LobbyFullError("Lobby is full.") 

80 

81 if not lobby.open: 

82 raise LobbyClosedError("Lobby has already started.") 

83 

84 channel = queue.Queue() 

85 player = Player(channel=channel, role=None) 

86 lobby.players[player.id] = player 

87 lobby.broadcast(Message(data=PlayerCount(count=len(lobby.players), player_ids=list(lobby.players.keys())))) 

88 

89 return player.id 

90 

91 def register_lobby(self) -> tuple[str, ...]: 

92 """ 

93 Create a new lobby in its own thread. 

94 

95 Returns: 

96 The lobby's join code 

97 

98 Raises: 

99 RuntimeError: If no more codes are available 

100 """ 

101 if not self.available_codes: 

102 raise RuntimeError("No more lobby codes available, server is full") 

103 

104 code = self.available_codes.pop() 

105 lobby = Lobby(code) 

106 

107 self.lobbies[code] = lobby 

108 

109 return code 

110 

111 def delete_lobby(self, code: tuple[str, ...]) -> None: 

112 """ 

113 Delete a lobby and recycle its code. 

114 

115 Args: 

116 code: The lobby join code. 

117 """ 

118 if code in self.lobbies: 

119 del self.lobbies[code] 

120 self.available_codes.append(code) 

121 

122 def channel(self, code: tuple[str, ...], id: str) -> Channel[TaggedMessage, Message]: 

123 """ 

124 Create a channel for sending and receiving messages to and from the lobby. 

125 

126 Args: 

127 code: Lobby join code. 

128 id: Player id. 

129 """ 

130 lobby = self.lobbies[code] 

131 

132 if not lobby.loop_started: 

133 lobby.loop_started = True 

134 start_main_loop(lobby) 

135 

136 channel = Channel(lobby.channel, lobby.players[id].channel) 

137 return channel 

138 

139 

140# Initialize the lobby manager without a code generator function 

141_LobbyManager = LobbyManager(BURGER_INGREDIENTS) 

142 

143 

144def lobby_manager() -> LobbyManager: 

145 """Return the lobby manager dependency.""" 

146 return _LobbyManager