File size: 6,166 Bytes
b44cc91 e7efca5 b44cc91 e7efca5 b44cc91 e7efca5 b44cc91 e7efca5 b44cc91 e7efca5 b44cc91 e7efca5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import threading
import time
import asyncio
import json
from typing import List
from queue import Queue
import numpy as np
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import uvicorn
from reachy_mini import ReachyMiniApp
from reachy_mini.reachy_mini import ReachyMini
from scipy.spatial.transform import Rotation as R
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.state_queue: Queue = Queue()
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
disconnected = []
for connection in self.active_connections:
try:
await connection.send_json(message)
except Exception:
disconnected.append(connection)
# Clean up disconnected clients
for conn in disconnected:
if conn in self.active_connections:
self.active_connections.remove(conn)
def queue_state(self, state: dict):
"""Queue state from sync context"""
self.state_queue.put(state)
class ExampleApp(ReachyMiniApp):
def __init__(self):
super().__init__()
self.app = FastAPI()
self.manager = ConnectionManager()
self.current_state = {}
self.setup_routes()
def setup_routes(self):
@self.app.get("/")
async def read_root():
return FileResponse("index.html")
@self.app.websocket("/api/state/ws/full")
async def websocket_endpoint(websocket: WebSocket):
await self.manager.connect(websocket)
try:
while True:
# Check for new state data and broadcast
if not self.manager.state_queue.empty():
state = self.manager.state_queue.get()
await self.manager.broadcast(state)
await asyncio.sleep(0.01)
except WebSocketDisconnect:
self.manager.disconnect(websocket)
def run(self, reachy_mini: ReachyMini, stop_event: threading.Event):
# Start FastAPI server in a separate thread
def start_server():
uvicorn.run(self.app, host="127.0.0.1", port=8000, log_level="info")
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()
print("π Web server started at http://127.0.0.1:8000")
print("π WebSocket available at ws://127.0.0.1:8000/api/state/ws/full")
print("π Open http://127.0.0.1:8000 in your browser")
print("Press Ctrl+C to stop\n")
t0 = time.time()
try:
while not stop_event.is_set():
pose = np.eye(4)
pose[:3, 3][2] = 0.005 * np.sin(2 * np.pi * 0.3 * time.time() + np.pi)
euler_rot = [
0,
0,
0.5 * np.sin(2 * np.pi * 0.3 * time.time() + np.pi),
]
rot_mat = R.from_euler("xyz", euler_rot, degrees=False).as_matrix()
pose[:3, :3] = rot_mat
pose[:3, 3][2] += 0.01 * np.sin(2 * np.pi * 0.5 * time.time())
antennas = np.array([1, 1]) * np.sin(2 * np.pi * 0.5 * time.time())
reachy_mini.set_target(head=pose, antennas=antennas)
# Prepare state data for broadcasting
state_data = {
"timestamp": time.time(),
"head_pose": pose.tolist(),
"antennas": antennas.tolist(),
"euler_rotation": euler_rot,
}
# Queue state for WebSocket broadcasting (from sync context)
self.manager.queue_state(state_data)
time.sleep(0.02)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
"""
Standalone testing mode - run this directly to test the app with a Reachy Mini
running on the same machine (localhost).
Usage:
python reachy_mini_app_example/main.py
Options:
- localhost_only=True: Connect to Reachy on localhost
- use_sim=True: Use simulation mode (if available)
"""
import signal
import sys
# Create the app instance
app = ExampleApp()
# Create a stop event for clean shutdown
stop_event = threading.Event()
shutdown_initiated = False
def signal_handler(sig, frame):
global shutdown_initiated
if not shutdown_initiated:
shutdown_initiated = True
print("\nπ Shutting down gracefully...")
stop_event.set()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# Connect to Reachy Mini on localhost
print("π€ Connecting to Reachy Mini...")
try:
# Connect to Reachy Mini (localhost_only=True connects to local robot)
# Set use_sim=True if you want to test with simulation
reachy_mini = ReachyMini(
localhost_only=True, # Connect to robot on localhost
spawn_daemon=False, # Don't spawn a new daemon (daemon already running)
use_sim=False, # Set to True for simulation mode
timeout=5.0,
log_level='INFO'
)
print("β
Connected to Reachy Mini!")
# Run the app
app.run(reachy_mini, stop_event)
except KeyboardInterrupt:
print("\nπ Interrupted by user")
except Exception as e:
print(f"β Error connecting to Reachy Mini: {e}")
print("\nMake sure:")
print(" 1. Reachy Mini daemon is running on this machine")
print(" 2. The Reachy Mini service is started")
print(" 3. Try setting use_sim=True for simulation mode")
finally:
print("π Goodbye!")
|