Joystick
Enable uses a virtual joystick to control the robot.
import asyncio
import threading
import tkinter as tk
from math import atan2, cos, sin, sqrt
from go2_webrtc_driver.constants import RTC_TOPIC
from go2_webrtc_driver.webrtc_driver import Go2WebRTCConnection, WebRTCConnectionMethod
import json
import subprocess
import os
SPORT_CMD = {
"BalanceStand": 1002,
"BackFlip": 1044,
"BodyHeight": 1013,
"Bound": 1304,
"ContinuousGait": 1019,
"Content": 1020,
"CrossStep": 1302,
"Dance1": 1022,
"Dance2": 1023,
"Damp": 1001,
"EconomicGait": 1035,
"Euler": 1007,
"FingerHeart": 1036,
"FootRaiseHeight": 1014,
"FreeWalk": 1045,
"FrontFlip": 1030,
"FrontJump": 1031,
"FrontPounce": 1032,
"GetBodyHeight": 1024,
"GetFootRaiseHeight": 1025,
"GetSpeedLevel": 1026,
"GetState": 1034,
"Handstand": 1301,
"Hello": 1016,
"LeadFollow": 1045,
"LeftFlip": 1042,
"MoonWalk": 1305,
"Move": 1008,
"OnesidedStep": 1303,
"Pose": 1028,
"RecoveryStand": 1006,
"RightFlip": 1043,
"RiseSit": 1010,
"Scrape": 1029,
"Sit": 1009,
"SpeedLevel": 1015,
"StandDown": 1005,
"StandOut": 1039,
"StandUp": 1004,
"StopMove": 1003,
"Stretch": 1017,
"SwitchGait": 1011,
"SwitchJoystick": 1027,
"TrajectoryFollow": 1018,
"Trigger": 1012,
"Wallow": 1021,
"WiggleHips": 1033,
}
conn = Go2WebRTCConnection(WebRTCConnectionMethod.LocalAP)
async def send_command(api_id, parameter=None):
print(f"Sending command: {api_id}, com parâmetros: {parameter}")
try:
await conn.datachannel.pub_sub.publish_request_new(
RTC_TOPIC["SPORT_MOD"],
{"api_id": api_id, "parameter": parameter if parameter else {}}
)
except Exception as e:
print(f"Error sending command: {e}")
def send_move_command(x, y, z):
asyncio.run_coroutine_threadsafe(
send_command(SPORT_CMD["Move"], {"x": x, "y": y, "z": z}), loop
)
def send_action_command(api_id):
asyncio.run_coroutine_threadsafe(
send_command(api_id), loop
)
class VirtualJoystick(tk.Tk):
def __init__(self):
super().__init__()
self.title("Virtual Joystick")
self.geometry("600x800")
self.resizable(False, False)
self.main_frame = tk.Frame(self)
self.main_frame.pack(expand=True)
self.joystick_frame = tk.Frame(self.main_frame)
self.joystick_frame.pack(pady=20)
self.joystick_radius = 100
self.knob_radius = 40
self.center_x = self.joystick_radius
self.center_y = self.joystick_radius
self.knob_x = self.center_x
self.knob_y = self.center_y
self.active = False
self.canvas = tk.Canvas(self.joystick_frame, width=2*self.joystick_radius, height=2*self.joystick_radius, highlightthickness=0)
self.canvas.pack()
self.canvas.create_oval(
self.center_x - self.joystick_radius,
self.center_y - self.joystick_radius,
self.center_x + self.joystick_radius,
self.center_y + self.joystick_radius,
fill="gray", outline=""
)
self.knob = self.canvas.create_oval(
self.knob_x - self.knob_radius,
self.knob_y - self.knob_radius,
self.knob_x + self.knob_radius,
self.knob_y + self.knob_radius,
fill="blue", outline=""
)
self.canvas.tag_bind(self.knob, "<ButtonPress-1>", self.on_press)
self.canvas.tag_bind(self.knob, "<B1-Motion>", self.on_move)
self.canvas.tag_bind(self.knob, "<ButtonRelease-1>", self.on_release)
self.action_frame = tk.Frame(self.main_frame)
self.action_frame.pack(pady=20)
self.add_action_buttons()
self.recon_button = tk.Button(self.main_frame, text="Reconhecimento de Gestos", width=30, command=self.toggle_recon)
self.recon_button.pack(pady=10)
self.status_label = tk.Label(self.main_frame, text="Pronto", font=("Arial", 14))
self.status_label.pack(pady=10)
self.recon_process = None
def add_action_buttons(self):
buttons_per_row = 4
row = 0
column = 0
for cmd_name in sorted(SPORT_CMD.keys()):
api_id = SPORT_CMD[cmd_name]
display_name = cmd_name.replace("_", " ").capitalize()
button = tk.Button(self.action_frame, text=display_name, width=15,
command=lambda api_id=api_id: self.on_action_press(api_id))
button.grid(row=row, column=column, padx=5, pady=5)
column += 1
if column >= buttons_per_row:
column = 0
row += 1
def on_press(self, event):
self.active = True
def on_move(self, event):
if not self.active:
return
dx = event.x - self.center_x
dy = event.y - self.center_y
distance = sqrt(dx**2 + dy**2)
if distance > self.joystick_radius - self.knob_radius:
angle = atan2(dy, dx)
dx = (self.joystick_radius - self.knob_radius) * cos(angle)
dy = (self.joystick_radius - self.knob_radius) * sin(angle)
new_x = self.center_x + dx
new_y = self.center_y + dy
self.canvas.coords(
self.knob,
new_x - self.knob_radius,
new_y - self.knob_radius,
new_x + self.knob_radius,
new_y + self.knob_radius
)
normalized_x = dx / (self.joystick_radius - self.knob_radius)
normalized_y = dy / (self.joystick_radius - self.knob_radius)
normalized_y = -normalized_y
send_move_command(normalized_y, 0, -normalized_x)
self.status_label.config(text=f"Movendo: x={normalized_x:.2f}, y={normalized_y:.2f}")
def on_release(self, event):
self.active = False
self.canvas.coords(
self.knob,
self.center_x - self.knob_radius,
self.center_y - self.knob_radius,
self.center_x + self.knob_radius,
self.center_y + self.knob_radius
)
send_move_command(0, 0, 0)
self.status_label.config(text="Parado")
def on_action_press(self, api_id):
send_action_command(api_id)
acao = self.get_action_name(api_id)
self.status_label.config(text=f"Executando: {acao}")
def get_action_name(self, api_id):
for key, value in SPORT_CMD.items():
if value == api_id:
return key.replace("_", " ").capitalize()
return "Ação Desconhecida"
def toggle_recon(self):
if self.recon_process is None or self.recon_process.poll() is not None:
self.start_recon()
else:
self.stop_recon()
def start_recon(self):
script_path = os.path.abspath("kaircode/Outros/Recon.py")
self.recon_process = subprocess.Popen(["python3", script_path])
self.status_label.config(text="Reconhecimento de Gestos Ativado")
self.recon_button.config(text="Desativar Reconhecimento de Gestos")
def stop_recon(self):
if self.recon_process:
self.recon_process.terminate()
self.recon_process.wait()
self.recon_process = None
self.status_label.config(text="Reconhecimento de Gestos Desativado")
self.recon_button.config(text="Reconhecimento de Gestos")
def on_close(self):
if self.recon_process:
self.recon_process.terminate()
self.recon_process.wait()
self.recon_process = None
self.destroy()
loop.call_soon_threadsafe(loop.stop)
asyncio_thread.join()
def run_asyncio_loop(loop):
asyncio.set_event_loop(loop)
async def setup():
try:
await conn.connect()
response = await conn.datachannel.pub_sub.publish_request_new(
RTC_TOPIC["MOTION_SWITCHER"],
{"api_id": 1001}
)
if response['data']['header']['status']['code'] == 0:
data = json.loads(response['data']['data'])
current_motion_switcher_mode = data['name']
print(f"Current motion mode: {current_motion_switcher_mode}")
if current_motion_switcher_mode != "normal":
await conn.datachannel.pub_sub.publish_request_new(
RTC_TOPIC["MOTION_SWITCHER"],
{"api_id": 1002, "parameter": {"name": "normal"}}
)
except Exception as e:
print(f"Error: {e}")
loop.run_until_complete(setup())
loop.run_forever()
if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio_thread = threading.Thread(target=run_asyncio_loop, args=(loop,), daemon=True)
asyncio_thread.start()
app = VirtualJoystick()
app.protocol("WM_DELETE_WINDOW", app.on_close)
app.mainloop()