import asyncio
import http.server
import json
import socketserver
import threading
from concurrent.futures import ThreadPoolExecutor
import keyboard
from websockets import ConnectionClosedError, ConnectionClosedOK
from websockets.server import serve
from universe.ants import Ant
from universe.engine import run
from universe.update import Update, UpdateType
HTTP_PORT = 80
[docs]
def start_http_server():
"""Start a simple HTTP server to serve the frontend."""
http_handler = http.server.SimpleHTTPRequestHandler
with socketserver.TCPServer(("", HTTP_PORT), http_handler) as httpd:
print(
f"Serving HTTP server at port {HTTP_PORT} - http://localhost:{HTTP_PORT} for local deployment"
)
httpd.serve_forever()
[docs]
async def handler(websocket):
"""
Handle the websocket connection.
:param websocket: The websocket connection.
:type websocket: websockets.WebSocketServerProtocol
"""
running_task = None
async def callback(
update_type: UpdateType, entity: Ant | None = None, target=None, state=None
):
"""
Send the update to the client.
:param update_type: The update type.
:type update_type: UpdateType
:param entity: The entity.
:type entity: Ant | None
:param target: The target.
:type target: Any
:param state: The state.
:type state: Any
"""
event = Update(update_type, entity, target, state).to_dict()
try:
await websocket.send(json.dumps(event))
except ConnectionClosedOK:
pass
except ConnectionClosedError:
pass
config = {}
try:
async for message in websocket:
data = json.loads(message)
print(f"Received: {data}")
print(UpdateType[data["type"]])
if UpdateType[data["type"]] == UpdateType.SIMULATION_START:
if running_task:
print("Canceling running simulation")
running_task.cancel()
config["pause"] = False
running_task = asyncio.create_task(run(config, callback))
elif UpdateType[data["type"]] == UpdateType.SIMULATION_SET_BOUNDARIES:
config["boundary"] = {"width": data["width"], "height": data["height"]}
elif UpdateType[data["type"]] == UpdateType.SIMULATION_SET_TPS:
if "tps" in data and isinstance(data["tps"], int) and data["tps"] > 0:
config["tps"] = data["tps"]
else:
await websocket.send(json.dumps({"type": "ERROR_INVALID_TPS"}))
elif UpdateType[data["type"]] == UpdateType.SIMULATION_SET_ROUNDS:
if (
"rounds" in data
and isinstance(data["rounds"], int)
and data["rounds"] > 0
):
config["rounds"] = data["rounds"]
else:
await websocket.send(json.dumps({"type": "ERROR_INVALID_ROUNDS"}))
elif UpdateType[data["type"]] == UpdateType.SIMULATION_END:
if running_task:
running_task.cancel()
running_task = None
await websocket.send(json.dumps({"type": "SIMULATION_END"}))
config.clear()
else:
await websocket.send(
json.dumps({"type": "ERROR_SIMULATION_NOT_RUNNING"})
)
elif UpdateType[data["type"]] == UpdateType.SIMULATION_PAUSE:
if running_task:
config["pause"] = True
await websocket.send(json.dumps({"type": "SIMULATION_PAUSE"}))
else:
await websocket.send(
json.dumps({"type": "ERROR_SIMULATION_NOT_RUNNING"})
)
elif UpdateType[data["type"]] == UpdateType.SIMULATION_RESUME:
if running_task:
config["pause"] = False
await websocket.send(json.dumps({"type": "SIMULATION_RESUME"}))
else:
await websocket.send(
json.dumps({"type": "ERROR_SIMULATION_NOT_RUNNING"})
)
elif UpdateType[data["type"]] == UpdateType.SIMULATION_SET_SEED:
config["seed"] = data["seed"]
else:
print("Unknown command")
except ConnectionClosedOK:
pass
except ConnectionClosedError:
pass
# This is a dummy function that does nothing and is used to replace the update_callback in the run function
[docs]
async def do_nothing(*args, **kwargs):
"""Do nothing."""
pass
executor = ThreadPoolExecutor(max_workers=1)
[docs]
def run_simulation(running_task, config):
"""
Run the simulation.
:param running_task: The running task.
:type running_task: asyncio.Task
:param config: The configuration.
:type config: dict
"""
if running_task:
print("Canceling running simulation")
running_task.cancel()
config["pause"] = False
executor.submit(asyncio.run, run(config, do_nothing))
[docs]
def stop_simulation(running_task):
"""
Stop the simulation.
:param running_task: The running task.
:type running_task: asyncio.Task
"""
if running_task:
print("Canceling running simulation")
running_task.cancel()
[docs]
def start_console_mode():
"""Start the console mode."""
print("Console mode started")
keyboard.remove_hotkey("ctrl+shift+w")
running_task = None
config = {}
keyboard.add_hotkey("ctrl+shift+s", run_simulation, args=(running_task, config))
keyboard.add_hotkey("ctrl+shift+x", stop_simulation, args=(running_task,))
keyboard.add_hotkey(
"ctrl+shift+d",
lambda: config.update({"console_map": not config.get("console_map", False)}),
)
print("Press ctrl+shift+s to start simulation")
print("Press ctrl+shift+x to stop simulation")
print("Press ctrl+shift+d to toggle console map of ants")
[docs]
async def start_servers():
"""Start the HTTP and WebSocket servers."""
http_thread = threading.Thread(target=start_http_server, daemon=True)
try:
http_thread.start()
print("Connect to ws://localhost:8765")
# Allow to start the simulation without opening the browser
# try:
# keyboard.add_hotkey("ctrl+shift+w", start_console_mode)
# except OSError:
# print("Cannot start console mode, please run the application as an administrator")
async with serve(handler, "0.0.0.0", 8765):
await asyncio.Future() # run forever
except asyncio.CancelledError:
print("Server stopped")
except Exception as e:
print(f"An error occurred: {e}")
print("Application will restart in 5 seconds")
await asyncio.sleep(5)
await start_servers()
[docs]
def main():
"""Start the main function."""
asyncio.run(start_servers())
if __name__ == "__main__":
main()