Some Integrations Unitree
Gesture Recognition

Gesture Recognition

Enable Gesture Recognition.

import cv2
import numpy as np
 
# Create an OpenCV window and display a blank image
height, width = 720, 1280  # Adjust the size as needed
img = np.zeros((height, width, 3), dtype=np.uint8)
cv2.imshow('Video', img)
cv2.waitKey(1)  # Ensure the window is created
 
import json
 
from go2_webrtc_driver.constants import RTC_TOPIC, SPORT_CMD
import asyncio
import logging
import threading
import time
from queue import Queue
from go2_webrtc_driver.webrtc_driver import Go2WebRTCConnection, WebRTCConnectionMethod
from aiortc import MediaStreamTrack
import cv2
import mediapipe as mp
import numpy as np
 
# Inicialização do MediaPipe Hands
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(max_num_hands=2)
mp_draw = mp.solutions.drawing_utils
 
def normalizar_pontos(landmarks):
    # Centraliza e normaliza os pontos
    pontos = np.array([[lm.x, lm.y, lm.z] for lm in landmarks])
    centro = pontos[0]  # Ponto do punho
    pontos -= centro
    escala = np.linalg.norm(pontos, axis=1).max()
    pontos /= escala
    return pontos
 
def detectar_gesto(pontos):
    # Cálculo dos ângulos ou distâncias relevantes
    # Exemplo simplificado para detectar mão aberta
    dedos_estendidos = []
    margem = 0.1  # Margem de erro de 10%
    for id in [8, 12, 16, 20]:  # Pontas dos dedos
        # Verifica se o dedo está estendido
        if pontos[id][1] < (pontos[id - 2][1] * (1 + margem)):
            dedos_estendidos.append(True)
        else:
            dedos_estendidos.append(False)
    # Verifica o polegar
    if pontos[4][0] > (pontos[3][0] * (1 - margem)):
        dedos_estendidos.append(True)
    else:
        dedos_estendidos.append(False)
 
    # Reconhecimento dos gestos
    if all(dedos_estendidos):
        return 'open_hand'
    if dedos_estendidos[0] and dedos_estendidos[1] and not any(dedos_estendidos[2:]):
        return 'peace'
    if dedos_estendidos[0] and not any(dedos_estendidos[1:]):
        return 'finger'
    if dedos_estendidos[0] and dedos_estendidos[3] and not any([dedos_estendidos[1], dedos_estendidos[2], dedos_estendidos[4]]):
        return 'rock'
    if dedos_estendidos[4] and dedos_estendidos[0] and not any([dedos_estendidos[1], dedos_estendidos[2], dedos_estendidos[3]]):
        return 'heart'
    if dedos_estendidos[1] and not any([dedos_estendidos[0], dedos_estendidos[2], dedos_estendidos[3], dedos_estendidos[4]]):
        return 'middle'
    if dedos_estendidos[0] and dedos_estendidos[3] and dedos_estendidos[4] and not any([dedos_estendidos[1], dedos_estendidos[2]]):
        return 'spider'
    if dedos_estendidos[0] and dedos_estendidos[1] and dedos_estendidos[3] and not any([dedos_estendidos[2], dedos_estendidos[4]]):
        return 'stranger'
    if dedos_estendidos[0] and dedos_estendidos[1] and dedos_estendidos[3] and not any([dedos_estendidos[2], dedos_estendidos[4]]):
        return 'stranger'
    if not any(dedos_estendidos):
        return 'closed'
    return 'none'
 
# Enable logging for debugging
logging.basicConfig(level=logging.FATAL)
 
 
BREAK_TIME_OUT = 1.0  # Tempo em segundos
 
# Enable logging for debugging
logging.basicConfig(level=logging.FATAL)
 
async def main():
    frame_queue = Queue()
    breaked = False  # Flag para prevenir ações repetidas
    breaked_at = None
 
    # Choose a connection method (uncomment the correct one)
    # conn = Go2WebRTCConnection(WebRTCConnectionMethod.LocalSTA, ip="192.168.1.12")
    # conn = Go2WebRTCConnection(WebRTCConnectionMethod.LocalSTA, serialNumber="B42D20007BCAEG4")
    conn = Go2WebRTCConnection(WebRTCConnectionMethod.Remote, serialNumber="FFFFFFFFFFFFFFFFFFF", username="FFFFFFFF@Fffffffffff.com", password="ffffffffffffffffff")
    # conn = Go2WebRTCConnection(WebRTCConnectionMethod.LocalAP)
 
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(
        min_detection_confidence=0.7, min_tracking_confidence=0.7, max_num_hands=1
    )
    # Async function to receive video frames and put them in the queue
    async def recv_camera_stream(track: MediaStreamTrack):
 
        while True:
            frame = await track.recv()
            # Convert the frame to a NumPy array
            img = frame.to_ndarray(format="bgr24")
            frame_queue.put(img)
 
    # Movimento comando
    async def run():
        await conn.datachannel.pub_sub.publish_request_new(
                                RTC_TOPIC["SPORT_MOD"], 
                                {
                                    "api_id": SPORT_CMD["Move"],
                                    "parameter": {"x": 0.2, "y": 0, "z": 0}
                                }
                            )
    
    #run backwords
    async def back():
        await conn.datachannel.pub_sub.publish_request_new(
                                RTC_TOPIC["SPORT_MOD"], 
                                {
                                    "api_id": SPORT_CMD["Move"],
                                    "parameter": {"x": -0.2, "y": 0, "z": 0}
                                }
                            )
        
    async def stand():
        await conn.datachannel.pub_sub.publish_request_new(
            RTC_TOPIC["MOTION_SWITCHER"], 
            {
                "api_id": 1002,
                "parameter": {"name": "ai"}
            }
        )
        await conn.datachannel.pub_sub.publish_request_new(
            RTC_TOPIC["SPORT_MOD"], 
            {
                "api_id": SPORT_CMD["StandOut"],
                "parameter": {"data": True}
            }
        )
 
        await asyncio.sleep(5)
 
        await conn.datachannel.pub_sub.publish_request_new(
            RTC_TOPIC["SPORT_MOD"], 
            {
                "api_id": SPORT_CMD["StandOut"],
                "parameter": {"data": False}
            }
        )
 
        await conn.datachannel.pub_sub.publish_request_new(
                        RTC_TOPIC["MOTION_SWITCHER"], 
                        {
                            "api_id": 1002,
                            "parameter": {"name": "normal"}
                        }
                    )
 
    async def flip():
        await conn.datachannel.pub_sub.publish_request_new(
            RTC_TOPIC["MOTION_SWITCHER"], 
            {
                "api_id": 1002,
                "parameter": {"name": "ai"}
            }
        )
 
        await conn.datachannel.pub_sub.publish_request_new(
             RTC_TOPIC["SPORT_MOD"], 
             {
                 "api_id": SPORT_CMD["BackFlip"],
                 "parameter": {"data": True}
             }
         )
        
        await conn.datachannel.pub_sub.publish_request_new(
                        RTC_TOPIC["MOTION_SWITCHER"], 
                        {
                            "api_id": 1002,
                            "parameter": {"name": "normal"}
                        }
                    )
 
    
    def run_asyncio_loop(loop):
        asyncio.set_event_loop(loop)
        async def setup():
            try:
                # Connect to the device
                await conn.connect()
                response = await conn.datachannel.pub_sub.publish_request_new(
                    RTC_TOPIC["MOTION_SWITCHER"], 
                    {"api_id": 1001}
                )
 
                if response['data']['header']['status']['code'] == 0:
                    data = json.loads(response['data']['data'])
                    current_motion_switcher_mode = data['name']
                    print(f"Current motion mode: {current_motion_switcher_mode}")
 
                # Switch to "normal" mode if not already
                if current_motion_switcher_mode != "normal":
                    print(f"Switching motion mode from {current_motion_switcher_mode} to 'normal'...")
                    await conn.datachannel.pub_sub.publish_request_new(
                        RTC_TOPIC["MOTION_SWITCHER"], 
                        {
                            "api_id": 1002,
                            "parameter": {"name": "normal"}
                        }
                    )
                    await asyncio.sleep(5)  # Wait while it stands up
 
 
 
                # Switch video channel on and start receiving video frames
                conn.video.switchVideoChannel(True)
 
                # Add callback to handle received video frames
                conn.video.add_track_callback(recv_camera_stream)
            except Exception as e:
                logging.error(f"Error in WebRTC connection: {e}")
 
        # Run the setup coroutine and then start the event loop
        loop.run_until_complete(setup())
        loop.run_forever()
 
    # Create a new event loop for the asyncio code
    loop = asyncio.new_event_loop()
 
    # Start the asyncio event loop in a separate thread
    asyncio_thread = threading.Thread(target=run_asyncio_loop, args=(loop,))
    asyncio_thread.start()
    # Start the asyncio event loop in a separate thread
 
    loop2 = asyncio.new_event_loop()
    th = threading.Thread(target=loop2.run_forever)
    th.start()
    try:
        while True:
            if not frame_queue.empty():
                img = frame_queue.get()
                print(f"Shape: {img.shape}, Dimensions: {img.ndim}, Type: {img.dtype}, Size: {img.size}")
                # Display the frame
                image_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                results = hands.process(image_rgb)
 
                print(breaked)
                if breaked and time.time() - breaked_at > 1:
                    breaked = False
                    
                # Desenhar landmarks da mão detectada e executar ações
                if results.multi_hand_landmarks:
                    for hand_landmarks in results.multi_hand_landmarks:
                        mp.solutions.drawing_utils.draw_landmarks(
                            img, hand_landmarks, mp_hands.HAND_CONNECTIONS
                        )
                        pontos = normalizar_pontos(hand_landmarks.landmark)
                        gesto = detectar_gesto(pontos)
                        #print(gesto)
                        if gesto == "open_hand":
                            asyncio.run_coroutine_threadsafe(run(), loop2)
                            #await asyncio.sleep(3)
                        if gesto == 'closed':
                            asyncio.run_coroutine_threadsafe(back(), loop2)
                        if gesto == 'peace':
                            asyncio.run_coroutine_threadsafe(stand(), loop2)
                        if gesto == 'spider':
                            asyncio.run_coroutine_threadsafe(flip(), loop2)
                        '''if gesto == 'finger':
                            asyncio.run_coroutine_threadsafe(run(), loop2)
                        if gesto == 'heart':
                            asyncio.run_coroutine_threadsafe(run(), loop2)'''
                        breaked = True
                        breaked_at = time.time()
 
                cv2.imshow('Video', img)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            else:
                # Sleep briefly to prevent high CPU usage
                time.sleep(0.01)
 
    finally:
 
        cv2.destroyAllWindows()
        # Stop the asyncio event loop
        loop.call_soon_threadsafe(loop.stop)
        loop2.call_soon_threadsafe(loop2.stop)
        asyncio_thread.join()
        th.join()
 
 
if __name__ == "__main__":
    asyncio.run(main())