Controller
Enable Use of a videogame controller(like PS4) to control the robot.
import asyncio
import threading
import pygame
from time import time
from go2_webrtc_driver.constants import RTC_TOPIC, SPORT_CMD
from go2_webrtc_driver.webrtc_driver import Go2WebRTCConnection, WebRTCConnectionMethod
import ctypes
import json
conn = Go2WebRTCConnection(WebRTCConnectionMethod.LocalAP)
conn = Go2WebRTCConnection(WebRTCConnectionMethod.Remote, serialNumber="FFFFFFFFFFFFFFFFFFF", username="FFFFFFFF@Fffffffffff.com", password="ffffffffffffffffff")
pygame.init()
pygame.joystick.init()
screen = pygame.display.set_mode((0, 0), pygame.NOFRAME) # Sem bordas, com transparência
pygame.display.set_caption("Janela Transparente")
hwnd = pygame.display.get_wm_info()["window"]
joystick = pygame.joystick.Joystick(0) # Primeiro controle conectado
joystick.init()
movement_active = {"forward": False, "backward": False, "rotate_left": False, "rotate_right": False}
movement_tasks = {}
is_moving = False # Indica se o robô está se movendo
is_executing_action = False # Indica se o robô está executando uma ação
async def send_command(api_id, parameter=None):
print(f"Sending command: {api_id}, com parâmetros: {parameter}")
await conn.datachannel.pub_sub.publish_request_new(
RTC_TOPIC["SPORT_MOD"],
{"api_id": api_id, "parameter": parameter if parameter else {}}
)
async def send_data(api_id, parameter=None):
print(f"Sending command: {api_id}, com parâmetros: {parameter}")
await conn.datachannel.pub_sub.publish_request_new(
api_id, parameter
)
threading.Timer(2, reset_is_executing_action).start()
async def move_device(x, y, z):
await send_command(SPORT_CMD["Move"], {"x": x, "y": y, "z": z})
async def continuous_movement(direction, x, y, z):
while movement_active[direction]:
await move_device(x, y, z)
def reset_is_executing_action():
global is_executing_action
is_executing_action = False
print("Action concluded. Ready for next command.")
action_durations = {
SPORT_CMD["StandUp"]: 1.0,
SPORT_CMD["Sit"]: 1.0,
SPORT_CMD["WiggleHips"]: 5.0,
SPORT_CMD["Dance1"]: 10.0,
}
def send_action_command(api_id):
global is_executing_action
if is_executing_action:
print(api_id)
print("There is already an action in execution. Wait before sending another command.")
return
is_executing_action = True
asyncio.run_coroutine_threadsafe(send_command(api_id), loop)
duration = action_durations.get(api_id, 2.0)
threading.Timer(duration, reset_is_executing_action).start()
def update_is_moving():
global is_moving
is_moving = any(movement_active.values())
def handle_joystick_events():
global movement_active, movement_tasks, is_moving, is_executing_action
axis_forward = joystick.get_axis(1) # Eixo Y do analógico esquerdo
axis_rotation = joystick.get_axis(0) # Eixo X do analógico esquerdo
axis_right_x = joystick.get_axis(2) # Eixo X do analógico direito
axis_right_y = joystick.get_axis(3)
# Movimento para frente
if axis_forward < -0.1:
if not movement_active["forward"]:
movement_active["forward"] = True
movement_tasks["forward"] = asyncio.run_coroutine_threadsafe(
continuous_movement("forward", 1, 0, 0), loop
)
else:
if movement_active["forward"]:
movement_active["forward"] = False
if axis_forward > 0.1:
if not movement_active["backward"]:
movement_active["backward"] = True
movement_tasks["backward"] = asyncio.run_coroutine_threadsafe(
continuous_movement("backward", -1, 0, 0), loop
)
else:
if movement_active["backward"]:
movement_active["backward"] = False
if axis_right_y < -0.1:
if not movement_active["rotate_left"]:
movement_active["rotate_left"] = True
movement_tasks["rotate_left"] = asyncio.run_coroutine_threadsafe(
continuous_movement("rotate_left", 0, 0, 1), loop
)
else:
if movement_active["rotate_left"]:
movement_active["rotate_left"] = False
if axis_right_y > 0.1:
if not movement_active["rotate_right"]:
movement_active["rotate_right"] = True
movement_tasks["rotate_right"] = asyncio.run_coroutine_threadsafe(
continuous_movement("rotate_right", 0, 0, -1), loop
)
else:
if movement_active["rotate_right"]:
movement_active["rotate_right"] = False
update_is_moving()
if joystick.get_button(0): # Botão A
send_action_command(SPORT_CMD["StandUp"])
if joystick.get_button(1): # Botão B
send_action_command(SPORT_CMD["StandDown"])
if joystick.get_button(2): # Botão X
send_action_command(SPORT_CMD["WiggleHips"])
if joystick.get_button(3): # Botão Y
send_action_command(SPORT_CMD["FingerHeart"])
lb_pressed = joystick.get_button(4) # LB
rb_pressed = joystick.get_button(5) # RB
lt_value = joystick.get_axis(2) # LT
rt_value = joystick.get_axis(5) # RT
hat_state = joystick.get_hat(0) # Setas
if hat_state == (0, -1):
send_action_command(SPORT_CMD["Sit"])
if hat_state == (0, 1):
send_action_command(SPORT_CMD["Hello"])
if hat_state == (-1, 0):
send_action_command(SPORT_CMD["WiggleHips"])
if hat_state == (1, 0):
send_action_command(SPORT_CMD["FrontJump"])
init = 'normal'
def run_asyncio_loop():
async def setup():
await conn.connect()
response = await conn.datachannel.pub_sub.publish_request_new(
RTC_TOPIC["MOTION_SWITCHER"],
{"api_id": 1001}
)
if response['data']['header']['status']['code'] == 0:
data = json.loads(response['data']['data'])
current_motion_switcher_mode = data['name']
print(f"Current motion mode: {current_motion_switcher_mode}")
if current_motion_switcher_mode != init:
print(f"Switching motion mode from {current_motion_switcher_mode} to '{init}'...")
await conn.datachannel.pub_sub.publish_request_new(
RTC_TOPIC["MOTION_SWITCHER"],
{
"api_id": 1002,
"parameter": {"name": init}
}
)
await asyncio.sleep(5)
loop.run_until_complete(setup())
loop.run_forever()
loop = asyncio.new_event_loop()
asyncio_thread = threading.Thread(target=run_asyncio_loop, daemon=True)
asyncio_thread.start()
screen = pygame.display.set_mode((400, 300))
pygame.display.set_caption("Controlador do Dispositivo")
running = True
try:
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
break
handle_joystick_events()
pygame.display.update()
finally:
loop.call_soon_threadsafe(loop.stop)
async def wait_for_loop():
await asyncio.sleep(1)
asyncio.run(wait_for_loop())
pygame.quit()