File size: 6,166 Bytes
b44cc91
 
e7efca5
 
 
 
b44cc91
 
e7efca5
 
 
 
b44cc91
 
 
 
 
e7efca5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b44cc91
e7efca5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b44cc91
e7efca5
 
 
 
 
 
 
 
 
 
 
 
b44cc91
e7efca5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import threading
import time
import asyncio
import json
from typing import List
from queue import Queue

import numpy as np
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import uvicorn
from reachy_mini import ReachyMiniApp
from reachy_mini.reachy_mini import ReachyMini
from scipy.spatial.transform import Rotation as R


class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
        self.state_queue: Queue = Queue()

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: dict):
        disconnected = []
        for connection in self.active_connections:
            try:
                await connection.send_json(message)
            except Exception:
                disconnected.append(connection)
        
        # Clean up disconnected clients
        for conn in disconnected:
            if conn in self.active_connections:
                self.active_connections.remove(conn)

    def queue_state(self, state: dict):
        """Queue state from sync context"""
        self.state_queue.put(state)


class ExampleApp(ReachyMiniApp):
    def __init__(self):
        super().__init__()
        self.app = FastAPI()
        self.manager = ConnectionManager()
        self.current_state = {}
        self.setup_routes()

    def setup_routes(self):
        @self.app.get("/")
        async def read_root():
            return FileResponse("index.html")

        @self.app.websocket("/api/state/ws/full")
        async def websocket_endpoint(websocket: WebSocket):
            await self.manager.connect(websocket)
            try:
                while True:
                    # Check for new state data and broadcast
                    if not self.manager.state_queue.empty():
                        state = self.manager.state_queue.get()
                        await self.manager.broadcast(state)
                    await asyncio.sleep(0.01)
            except WebSocketDisconnect:
                self.manager.disconnect(websocket)

    def run(self, reachy_mini: ReachyMini, stop_event: threading.Event):
        # Start FastAPI server in a separate thread
        def start_server():
            uvicorn.run(self.app, host="127.0.0.1", port=8000, log_level="info")

        server_thread = threading.Thread(target=start_server, daemon=True)
        server_thread.start()

        print("πŸš€ Web server started at http://127.0.0.1:8000")
        print("πŸ”Œ WebSocket available at ws://127.0.0.1:8000/api/state/ws/full")
        print("πŸ“„ Open http://127.0.0.1:8000 in your browser")
        print("Press Ctrl+C to stop\n")

        t0 = time.time()
        try:
            while not stop_event.is_set():
                pose = np.eye(4)
                pose[:3, 3][2] = 0.005 * np.sin(2 * np.pi * 0.3 * time.time() + np.pi)
                euler_rot = [
                    0,
                    0,
                    0.5 * np.sin(2 * np.pi * 0.3 * time.time() + np.pi),
                ]
                rot_mat = R.from_euler("xyz", euler_rot, degrees=False).as_matrix()
                pose[:3, :3] = rot_mat
                pose[:3, 3][2] += 0.01 * np.sin(2 * np.pi * 0.5 * time.time())
                antennas = np.array([1, 1]) * np.sin(2 * np.pi * 0.5 * time.time())

                reachy_mini.set_target(head=pose, antennas=antennas)

                # Prepare state data for broadcasting
                state_data = {
                    "timestamp": time.time(),
                    "head_pose": pose.tolist(),
                    "antennas": antennas.tolist(),
                    "euler_rotation": euler_rot,
                }

                # Queue state for WebSocket broadcasting (from sync context)
                self.manager.queue_state(state_data)

                time.sleep(0.02)
        except KeyboardInterrupt:
            pass


if __name__ == "__main__":
    """
    Standalone testing mode - run this directly to test the app with a Reachy Mini
    running on the same machine (localhost).
    
    Usage:
        python reachy_mini_app_example/main.py
        
    Options:
        - localhost_only=True: Connect to Reachy on localhost
        - use_sim=True: Use simulation mode (if available)
    """
    import signal
    import sys
    
    # Create the app instance
    app = ExampleApp()
    
    # Create a stop event for clean shutdown
    stop_event = threading.Event()
    shutdown_initiated = False
    
    def signal_handler(sig, frame):
        global shutdown_initiated
        if not shutdown_initiated:
            shutdown_initiated = True
            print("\nπŸ›‘ Shutting down gracefully...")
            stop_event.set()
            sys.exit(0)
    
    signal.signal(signal.SIGINT, signal_handler)
    
    # Connect to Reachy Mini on localhost
    print("πŸ€– Connecting to Reachy Mini...")
    try:
        # Connect to Reachy Mini (localhost_only=True connects to local robot)
        # Set use_sim=True if you want to test with simulation
        reachy_mini = ReachyMini(
            localhost_only=True,  # Connect to robot on localhost
            spawn_daemon=False,   # Don't spawn a new daemon (daemon already running)
            use_sim=False,        # Set to True for simulation mode
            timeout=5.0,
            log_level='INFO'
        )
        print("βœ… Connected to Reachy Mini!")
        
        # Run the app
        app.run(reachy_mini, stop_event)
        
    except KeyboardInterrupt:
        print("\nπŸ›‘ Interrupted by user")
    except Exception as e:
        print(f"❌ Error connecting to Reachy Mini: {e}")
        print("\nMake sure:")
        print("  1. Reachy Mini daemon is running on this machine")
        print("  2. The Reachy Mini service is started")
        print("  3. Try setting use_sim=True for simulation mode")
    finally:
        print("πŸ‘‹ Goodbye!")