Zappy - Year end project 2
This is a project that Epitech asked us to create in order to allow us to reveiw the notions of the current year.
Loading...
Searching...
No Matches
logistics.py
Go to the documentation of this file.
7
8import sys
9from datetime import datetime
10
11from convert_data import ConvertData
12from global_variables import GlobalVariables, Commands, Items
13from custom_functions import pinfo, psuccess, perror, pdebug, pwarning
14
15
17 """_summary_
18 This is the class in charge of containing the ai which provides a purpose for this program
19
20 Args:
21 Thread (_type_): _description_: The class that is to be used in order to start threads
22 """
23
24 def __init__(self, global_variables: GlobalVariables) -> None:
25 self.global_variables = global_variables
26 self.global_variables.ai_status = self.global_variables.success
27 self.client_number = ""
29 self.welcome_received = False
30 self.client_number_received: bool = False
32 self.global_variables.ai_ready = True
33 self.nb_responses = 0
34 self.tile_content = []
36 self.sabotage = False
37 self.direction_options_list = ["forward", "left", "right"]
39 "forward": self._forward_forward,
40 "left": self._left_left,
41 "right": self._right
42 }
43
44 def _create_random_seed(self) -> int:
45 """_summary_
46 Generate a random seed
47 Returns:
48 int: _description_
49 """
50 c_time = datetime.now()
51 final_seed = c_time.hour + c_time.minute + c_time.second + c_time.microsecond
52 pwarning(self.global_variables, f"Generated seed: {final_seed}")
53 return final_seed
54
55 def _forward(self) -> None:
56 """_summary_
57 Add the forward command
58 """
59 e = ConvertData(
60 data=self.global_variables.translation_reference.text_equivalence[Commands.FORWARD],
61 error=self.global_variables.error,
62 success=self.global_variables.success
63 )
64 output = e.to_external()
65 pinfo(self.global_variables, f"Command sent: {output}")
66 self.global_variables.response_buffer.append(output)
67
68 def _left(self) -> None:
69 """_summary_
70 Add the left command
71 """
72 e = ConvertData(
73 data=self.global_variables.translation_reference.text_equivalence[Commands.LEFT],
74 error=self.global_variables.error,
75 success=self.global_variables.success
76 )
77 output = e.to_external()
78 pinfo(self.global_variables, f"Command sent: {output}")
79 self.global_variables.response_buffer.append(output)
80
81 def _right(self) -> None:
82 """_summary_
83 Add the right command
84 """
85 e = ConvertData(
86 data=self.global_variables.translation_reference.text_equivalence[Commands.RIGHT],
87 error=self.global_variables.error,
88 success=self.global_variables.success
89 )
90 output = e.to_external()
91 pinfo(self.global_variables, f"Command sent: {output}")
92 self.global_variables.response_buffer.append(output)
93
94 def _my_randint(self, minimum: int = 0, maximum: int = 3) -> int:
95 """_summary_
96 This is a function that will be used in order to provide a randome number without using the random library.
97 """
98 artificial_turn = 200
99 counter = 0
100 while counter < artificial_turn:
101 pdebug(self.global_variables, "(logistics) In random")
102 c_time = datetime.now()
103 current_time = c_time.hour + c_time.minute + c_time.second + c_time.microsecond
104 final_number = (current_time + self.random_seed) % (maximum + 1)
105 if minimum <= final_number <= maximum:
106 return final_number
107 counter += 1
108 return minimum
109
110 def _change_my_mind(self) -> None:
111 """_summary_
112 This function is the one in charge of creating a random movement
113 """
114 move = self._my_randint(0, len(self.direction_options) - 1)
115 pinfo(
116 self.global_variables,
117 f"Selected direction: {self.direction_options_list[move]}"
118 )
120
121 def _update_ai_status(self, status: int) -> None:
122 """_summary_
123 This function works like a onetime valve, if one error ever gets raised, the variable will never go back to 0.
124 Args:
125 status (int): _description_: the run status of the previous function
126 """
127 if status != self.global_variables.success:
128 pwarning(
129 self.global_variables,
130 f"The global status if not a success, error code: {status}"
131 )
132 self.global_variables.ai_status = status
133 psuccess(self.global_variables, "The global status is a success.")
134
135 def _process_welcome(self, data: str) -> int:
136 """_summary_
137 This function will set the variables corresponding to the initialisation process
138 Returns:
139 int: _description_: The processing status
140 """
141 if "WELCOME" in data:
142 pinfo(
143 self.global_variables,
144 "Welcome key received"
145 )
146 self.global_variables.response_buffer.append(
147 self.global_variables.user_arguments.name
148 )
149 self.welcome_received = True
150 return self.global_variables.success
151 pdebug(
152 self.global_variables,
153 f"(logistics): response = {data}, nb_responses = {self.nb_responses}"
154 )
155 self._exit_error("Did not receive the expected welcome message")
156 return self.global_variables.error
157
158 def _process_client_number(self, data: str) -> int:
159 """_summary_
160 This function will check if we have received the client number
161 Args:
162 data (str): _description_: The incoming data
163
164 Returns:
165 int: _description_: The status of the processed data
166 """
167 if data in ({}, []):
168 response = f"Client number cannot be empty.\nYou entered:'{data}'"
169 perror(self.global_variables, response)
170 return self._exit_error(f"Error: {response}")
171 if isinstance(data, (dict)):
172 data = data[list(data)[0]]
173 if isinstance(data, list):
174 data = data[0]
175 if data.isnumeric():
176 self.client_number = int(data)
178 return self.global_variables.success
179
180 def _process_map_dimensions(self, data: str) -> int:
181 """_summary_
182 This is a function that will make sure that the dimensions of the map.
183 Args:
184 data (str): _description_
185
186 Returns:
187 int: _description_
188 """
189 if " " not in data:
190 response = f"Map dimensions cannot be empty.\nYou entered:'{data}'"
191 perror(self.global_variables, response)
192 return self._exit_error(f"Error: {response}")
193 dimensions = data.split(" ")
194 if len(dimensions) > 0 and dimensions[0] == '':
195 dimensions.pop(0)
196 if len(dimensions) > 0 and dimensions[-1] == '':
197 dimensions.pop(-1)
198 if len(dimensions) == 2:
199 if dimensions[0].isnumeric() and dimensions[1].isnumeric():
200 self.client_coordinates = (
201 int(dimensions[0]), int(dimensions[1]))
202 pdebug(
203 self.global_variables,
204 f"Client coordinates = {self.client_coordinates}"
205 )
206 self.initialisation_complete = True
207 self.global_variables.ai_ready = True
209 return self.global_variables.success
210 response = f"Map dimensions cannot be empty.\nYou entered:'{data}'"
211 perror(self.global_variables, response)
212 return self._exit_error(f"Error: {response}")
213
214 def _can_evolve(self, command: str) -> bool:
215 """_summary_
216 This function checks if the ai can evolve.
217 Args:
218 command (str | list): _description_: The content of the Look response
219
220 Returns:
221 bool: _description_: Returns True if the ai can get to the next level, otherwise False is returned.
222 """
223 max_i = len(command)
224 pdebug(
225 self.global_variables,
226 f"(logistic) _can_evolve: max_i = {max_i}, Provided command = {command}"
227 )
228 if max_i == 0:
229 pdebug(
230 self.global_variables,
231 "(logistics) _can_evolve: The command is empty"
232 )
233 return False
234 if Items.LINEMATE in command or "linemate" in command or "Linemate" in command:
235 psuccess(
236 self.global_variables,
237 "The item Linemate was found on the ground"
238 )
239 return True
240 pdebug(
241 self.global_variables,
242 "(logistics) _can_evolve: The Linemate item is not found"
243 )
244 return False
245
246 def _exit_error(self, string: str) -> int:
247 """_summary_
248 This function should never be triggered in a normal use case.
249 This function will send an error message and set the global status to self.global_variable.error
250 Returns:
251 int: _description_: The error status defined in global_variables
252 """
253 self.global_variables.continue_running = False
254 status = self.global_variables.error
255 self._update_ai_status(status)
256 self.global_variables.response_buffer.append(f"Error: {string}\n")
257 return status
258
259 def _append_look_command(self) -> None:
260 """_summary_
261 This function appends the look command to the response buffer.
262 """
263 e = ConvertData(
264 data=self.global_variables.translation_reference.text_equivalence[Commands.LOOK],
265 error=self.global_variables.error,
266 success=self.global_variables.success
267 )
268 output = e.to_external()
269 pinfo(self.global_variables, f"Command sent: {output}")
270 self.global_variables.response_buffer.append(output)
271
272 def _to_human_readable(self, code: int) -> str:
273 """_summary_
274 Replace the code with the corresponding state.
275 Args:
276 code (int): _description_: The corresponding code
277
278 Returns:
279 str: _description_: The corresponding text that the human can understand
280 """
281 if code in self.global_variables.translation_reference.text_equivalence:
282 return self.global_variables.translation_reference.text_equivalence[code]
283 return code
284
285 def _calculate_next_move(self, response: dict[Commands, any]) -> int:
286 """_summary_
287 This function will calculate the next move based on the response from the server.
288
289 Args:
290 response (dict[Commands, any]): _description_: The query that came in from the tcp protocol.
291
292 Returns:
293 int: _description_: The status of the execution.
294 """
295 status = self.global_variables.success
296
297 pdebug(self.global_variables, f"Incoming response = {response}")
298
299 if self.sabotage is True:
300 self._left_left()
301
302 if response[list(response)[0]] in ("ko", "KO", "Ko", "kO"):
303 perror(
304 self.global_variables,
305 f"Command = '{self._to_human_readable(list(response)[0])}' failed"
306 )
308 return self.global_variables.success
309
310 if Commands.LOOK in response:
311 psuccess(self.global_variables, f"Look response = {response}")
312 self.tile_content = response[Commands.LOOK]
313 if self._can_evolve(response[Commands.LOOK]) is True:
314 e = ConvertData(
315 data=self.global_variables.translation_reference.text_equivalence[
316 Commands.INCANTATION
317 ],
318 error=self.global_variables.error,
319 success=self.global_variables.success
320 )
321 self.global_variables.response_buffer.append(e.to_external())
322 else:
323 self._change_my_mind()
324
325 elif Commands.LEFT in response and response[Commands.LEFT].lower() == "ok":
327
328 elif Commands.RIGHT in response and response[Commands.RIGHT].lower() == "ok":
330
331 elif Commands.FORWARD in response and response[Commands.FORWARD].lower() == "ok":
333
334 elif Commands.INCANTATION in response and response[Commands.INCANTATION].lower() == "ok":
335 self.sabotage = True
336 self._left_left()
337
338 else:
340
341 return status
342
343 def dispatcher(self, response: dict[Commands, any]) -> int:
344 """_summary_
345
346 Args:
347 response (str): _description_: The converted response
348
349 Returns:
350 int: _description_: The status response of the program
351 """
352 status = self.global_variables.success
353 self.nb_responses += 1
354 pinfo(self.global_variables, f"data = {response}")
355
356 if Commands.UNKNOWN in response and self.nb_responses == 1:
357 status = self._process_welcome(response[Commands.UNKNOWN])
358 elif Commands.UNKNOWN in response and self.welcome_received is True and self.nb_responses == 2:
359 status = self._process_client_number(response)
360 elif Commands.UNKNOWN in response and self.client_number_received is True and self.nb_responses == 3:
361 status = self._process_map_dimensions(response[Commands.UNKNOWN])
362 elif self.nb_responses >= 4 and self.initialisation_complete is True:
363 status = self._calculate_next_move(response)
364 self._update_ai_status(status)
365 return status
366
367
368if __name__ == "__main__":
369 print("This script is not meant to be run as main.")
370 NAME = "my_zappy"
371 IP = "0.0.0.0"
372 PORT = 4242
373 ERROR = 84
374 SUCCESS = 0
375 CLIENT_NUM = 1
376 MAP_X = 20
377 MAP_Y = 20
378 ASCEND = "ASCEND"
379 LEFT = "LEFT"
380 RIGHT = "RIGHT"
381 FORWARD = "FORWARD"
382 LIFE_LENGTH = 200
383 LIFE_INDEX = 0
385 success=0,
386 error=84,
387 ip="0.0.0.0",
388 port=4242,
389 name="my_zappy",
390 debug=True
391 )
392 GI.current_buffer.append(ConvertData("WELCOME").to_internal())
393 LI = Logistics(GI)
394 LI.start()
395 GI.colourise_output.init_pallet()
396 GI.colourise_error.init_pallet()
397 while GI.continue_running is True:
398 if len(GI.sender_data) == 0:
399 continue
400 if GI.response_buffer[0] == NAME:
401 GI.current_buffer.append(
402 ConvertData(f"{CLIENT_NUM}\n").to_internal()
403 )
404 GI.current_buffer.append(
405 ConvertData(f" {MAP_X} {MAP_Y}\n").to_external()
406 )
407 if GI.response_buffer[0] == FORWARD:
408 GI.current_buffer.append(ConvertData("ok\n").to_internal())
409 if GI.response_buffer[0] == LEFT:
410 GI.current_buffer.append(ConvertData("ok\n").to_internal())
411 if GI.response_buffer[0] == RIGHT:
412 GI.current_buffer.append(ConvertData("ok\n").to_internal())
413 if LIFE_INDEX == LIFE_LENGTH:
414 GI.current_buffer.append(ConvertData("Dead").to_internal())
415 LIFE_INDEX += 1
416 pinfo(GI, f"Program stopped with status: {GI.current_status}")
417 GI.colourise_output.unload_ressources()
418 sys.exit(GI.current_status)
None _append_look_command(self)
Definition logistics.py:259
None _change_my_mind(self)
Definition logistics.py:110
None _right(self)
Definition logistics.py:81
int _process_client_number(self, str data)
Definition logistics.py:158
None __init__(self, GlobalVariables global_variables)
Definition logistics.py:24
None _left(self)
Definition logistics.py:68
int _process_map_dimensions(self, str data)
Definition logistics.py:180
int dispatcher(self, dict[Commands, any] response)
Definition logistics.py:343
int _my_randint(self, int minimum=0, int maximum=3)
Definition logistics.py:94
None _forward(self)
Definition logistics.py:55
None _update_ai_status(self, int status)
Definition logistics.py:121
str _to_human_readable(self, int code)
Definition logistics.py:272
int _calculate_next_move(self, dict[Commands, any] response)
Definition logistics.py:285
int _process_welcome(self, str data)
Definition logistics.py:135
bool _can_evolve(self, str command)
Definition logistics.py:214
int _exit_error(self, str string)
Definition logistics.py:246
int _create_random_seed(self)
Definition logistics.py:44