Some Integrations Unitree
VideoGame Controller

Controller

Enable Use of a videogame controller(like PS4) to control the robot.

import asyncio
import threading
import pygame
from time import time
from go2_webrtc_driver.constants import RTC_TOPIC, SPORT_CMD
from go2_webrtc_driver.webrtc_driver import Go2WebRTCConnection, WebRTCConnectionMethod
import ctypes
import json
 
conn = Go2WebRTCConnection(WebRTCConnectionMethod.LocalAP)
conn = Go2WebRTCConnection(WebRTCConnectionMethod.Remote, serialNumber="FFFFFFFFFFFFFFFFFFF", username="FFFFFFFF@Fffffffffff.com", password="ffffffffffffffffff")
 
pygame.init()
pygame.joystick.init()
screen = pygame.display.set_mode((0, 0), pygame.NOFRAME)  # Sem bordas, com transparência
pygame.display.set_caption("Janela Transparente")
hwnd = pygame.display.get_wm_info()["window"]
 
joystick = pygame.joystick.Joystick(0)  # Primeiro controle conectado
joystick.init()
 
movement_active = {"forward": False, "backward": False, "rotate_left": False, "rotate_right": False}
movement_tasks = {}
is_moving = False  # Indica se o robô está se movendo
is_executing_action = False  # Indica se o robô está executando uma ação
 
async def send_command(api_id, parameter=None):
    print(f"Sending command: {api_id}, com parâmetros: {parameter}")
    await conn.datachannel.pub_sub.publish_request_new(
        RTC_TOPIC["SPORT_MOD"],
        {"api_id": api_id, "parameter": parameter if parameter else {}}
    )
 
async def send_data(api_id, parameter=None):
    print(f"Sending command: {api_id}, com parâmetros: {parameter}")
    await conn.datachannel.pub_sub.publish_request_new(
        api_id, parameter
    )
    threading.Timer(2, reset_is_executing_action).start()
 
async def move_device(x, y, z):
    await send_command(SPORT_CMD["Move"], {"x": x, "y": y, "z": z})
 
async def continuous_movement(direction, x, y, z):
    while movement_active[direction]:
        await move_device(x, y, z)
 
def reset_is_executing_action():
    global is_executing_action
    is_executing_action = False
    print("Action concluded. Ready for next command.")
 
action_durations = {
    SPORT_CMD["StandUp"]: 1.0,
    SPORT_CMD["Sit"]: 1.0,
    SPORT_CMD["WiggleHips"]: 5.0,
    SPORT_CMD["Dance1"]: 10.0,
}
 
def send_action_command(api_id):
    global is_executing_action
    if is_executing_action:
        print(api_id)
        print("There is already an action in execution. Wait before sending another command.")
        return
    is_executing_action = True
    asyncio.run_coroutine_threadsafe(send_command(api_id), loop)
    duration = action_durations.get(api_id, 2.0)
    threading.Timer(duration, reset_is_executing_action).start()
 
def update_is_moving():
    global is_moving
    is_moving = any(movement_active.values())
 
def handle_joystick_events():
    global movement_active, movement_tasks, is_moving, is_executing_action
 
    axis_forward = joystick.get_axis(1)  # Eixo Y do analógico esquerdo
    axis_rotation = joystick.get_axis(0)  # Eixo X do analógico esquerdo
 
    axis_right_x = joystick.get_axis(2)  # Eixo X do analógico direito
    axis_right_y = joystick.get_axis(3)  
    # Movimento para frente
    if axis_forward < -0.1:
        if not movement_active["forward"]:
            movement_active["forward"] = True
            movement_tasks["forward"] = asyncio.run_coroutine_threadsafe(
                continuous_movement("forward", 1, 0, 0), loop
            )
    else:
        if movement_active["forward"]:
            movement_active["forward"] = False
 
    if axis_forward > 0.1:
        if not movement_active["backward"]:
            movement_active["backward"] = True
            movement_tasks["backward"] = asyncio.run_coroutine_threadsafe(
                continuous_movement("backward", -1, 0, 0), loop
            )
    else:
        if movement_active["backward"]:
            movement_active["backward"] = False
 
    if axis_right_y < -0.1:
        if not movement_active["rotate_left"]:
            movement_active["rotate_left"] = True
            movement_tasks["rotate_left"] = asyncio.run_coroutine_threadsafe(
                continuous_movement("rotate_left", 0, 0, 1), loop
            )
    else:
        if movement_active["rotate_left"]:
            movement_active["rotate_left"] = False
 
    if axis_right_y > 0.1:
        if not movement_active["rotate_right"]:
            movement_active["rotate_right"] = True
            movement_tasks["rotate_right"] = asyncio.run_coroutine_threadsafe(
                continuous_movement("rotate_right", 0, 0, -1), loop
            )
    else:
        if movement_active["rotate_right"]:
            movement_active["rotate_right"] = False
 
    update_is_moving()
 
    if joystick.get_button(0):  # Botão A
        send_action_command(SPORT_CMD["StandUp"])
    if joystick.get_button(1):  # Botão B
        send_action_command(SPORT_CMD["StandDown"])
    if joystick.get_button(2):  # Botão X
        send_action_command(SPORT_CMD["WiggleHips"])
    if joystick.get_button(3):  # Botão Y
        send_action_command(SPORT_CMD["FingerHeart"])
 
    lb_pressed = joystick.get_button(4)  # LB
    rb_pressed = joystick.get_button(5)  # RB
    lt_value = joystick.get_axis(2)  # LT
    rt_value = joystick.get_axis(5)  # RT
    hat_state = joystick.get_hat(0)  # Setas
 
    if hat_state == (0, -1): 
        send_action_command(SPORT_CMD["Sit"])
    if hat_state == (0, 1): 
        send_action_command(SPORT_CMD["Hello"])
    if hat_state == (-1, 0): 
        send_action_command(SPORT_CMD["WiggleHips"])
    if hat_state == (1, 0):  
        send_action_command(SPORT_CMD["FrontJump"])
 
init = 'normal'
def run_asyncio_loop():
    async def setup():
        await conn.connect()
        response = await conn.datachannel.pub_sub.publish_request_new(
            RTC_TOPIC["MOTION_SWITCHER"], 
            {"api_id": 1001}
        )
 
        if response['data']['header']['status']['code'] == 0:
            data = json.loads(response['data']['data'])
            current_motion_switcher_mode = data['name']
            print(f"Current motion mode: {current_motion_switcher_mode}")
        if current_motion_switcher_mode != init:
            print(f"Switching motion mode from {current_motion_switcher_mode} to '{init}'...")
            await conn.datachannel.pub_sub.publish_request_new(
                RTC_TOPIC["MOTION_SWITCHER"], 
                {
                    "api_id": 1002,
                    "parameter": {"name": init}
                }
            )
            await asyncio.sleep(5)
    loop.run_until_complete(setup())
    loop.run_forever()
 
loop = asyncio.new_event_loop()
asyncio_thread = threading.Thread(target=run_asyncio_loop, daemon=True)
asyncio_thread.start()
 
screen = pygame.display.set_mode((400, 300))
pygame.display.set_caption("Controlador do Dispositivo")
 
running = True
try:
    while running:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
                break
 
        handle_joystick_events()
        pygame.display.update()
 
finally:
    loop.call_soon_threadsafe(loop.stop)
    async def wait_for_loop():
        await asyncio.sleep(1)
    asyncio.run(wait_for_loop())
    pygame.quit()