8bitkick
Init
e7efca5
raw
history blame
6.17 kB
import threading
import time
import asyncio
import json
from typing import List
from queue import Queue
import numpy as np
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import uvicorn
from reachy_mini import ReachyMiniApp
from reachy_mini.reachy_mini import ReachyMini
from scipy.spatial.transform import Rotation as R
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.state_queue: Queue = Queue()
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
disconnected = []
for connection in self.active_connections:
try:
await connection.send_json(message)
except Exception:
disconnected.append(connection)
# Clean up disconnected clients
for conn in disconnected:
if conn in self.active_connections:
self.active_connections.remove(conn)
def queue_state(self, state: dict):
"""Queue state from sync context"""
self.state_queue.put(state)
class ExampleApp(ReachyMiniApp):
def __init__(self):
super().__init__()
self.app = FastAPI()
self.manager = ConnectionManager()
self.current_state = {}
self.setup_routes()
def setup_routes(self):
@self.app.get("/")
async def read_root():
return FileResponse("index.html")
@self.app.websocket("/api/state/ws/full")
async def websocket_endpoint(websocket: WebSocket):
await self.manager.connect(websocket)
try:
while True:
# Check for new state data and broadcast
if not self.manager.state_queue.empty():
state = self.manager.state_queue.get()
await self.manager.broadcast(state)
await asyncio.sleep(0.01)
except WebSocketDisconnect:
self.manager.disconnect(websocket)
def run(self, reachy_mini: ReachyMini, stop_event: threading.Event):
# Start FastAPI server in a separate thread
def start_server():
uvicorn.run(self.app, host="127.0.0.1", port=8000, log_level="info")
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()
print("πŸš€ Web server started at http://127.0.0.1:8000")
print("πŸ”Œ WebSocket available at ws://127.0.0.1:8000/api/state/ws/full")
print("πŸ“„ Open http://127.0.0.1:8000 in your browser")
print("Press Ctrl+C to stop\n")
t0 = time.time()
try:
while not stop_event.is_set():
pose = np.eye(4)
pose[:3, 3][2] = 0.005 * np.sin(2 * np.pi * 0.3 * time.time() + np.pi)
euler_rot = [
0,
0,
0.5 * np.sin(2 * np.pi * 0.3 * time.time() + np.pi),
]
rot_mat = R.from_euler("xyz", euler_rot, degrees=False).as_matrix()
pose[:3, :3] = rot_mat
pose[:3, 3][2] += 0.01 * np.sin(2 * np.pi * 0.5 * time.time())
antennas = np.array([1, 1]) * np.sin(2 * np.pi * 0.5 * time.time())
reachy_mini.set_target(head=pose, antennas=antennas)
# Prepare state data for broadcasting
state_data = {
"timestamp": time.time(),
"head_pose": pose.tolist(),
"antennas": antennas.tolist(),
"euler_rotation": euler_rot,
}
# Queue state for WebSocket broadcasting (from sync context)
self.manager.queue_state(state_data)
time.sleep(0.02)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
"""
Standalone testing mode - run this directly to test the app with a Reachy Mini
running on the same machine (localhost).
Usage:
python reachy_mini_app_example/main.py
Options:
- localhost_only=True: Connect to Reachy on localhost
- use_sim=True: Use simulation mode (if available)
"""
import signal
import sys
# Create the app instance
app = ExampleApp()
# Create a stop event for clean shutdown
stop_event = threading.Event()
shutdown_initiated = False
def signal_handler(sig, frame):
global shutdown_initiated
if not shutdown_initiated:
shutdown_initiated = True
print("\nπŸ›‘ Shutting down gracefully...")
stop_event.set()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# Connect to Reachy Mini on localhost
print("πŸ€– Connecting to Reachy Mini...")
try:
# Connect to Reachy Mini (localhost_only=True connects to local robot)
# Set use_sim=True if you want to test with simulation
reachy_mini = ReachyMini(
localhost_only=True, # Connect to robot on localhost
spawn_daemon=False, # Don't spawn a new daemon (daemon already running)
use_sim=False, # Set to True for simulation mode
timeout=5.0,
log_level='INFO'
)
print("βœ… Connected to Reachy Mini!")
# Run the app
app.run(reachy_mini, stop_event)
except KeyboardInterrupt:
print("\nπŸ›‘ Interrupted by user")
except Exception as e:
print(f"❌ Error connecting to Reachy Mini: {e}")
print("\nMake sure:")
print(" 1. Reachy Mini daemon is running on this machine")
print(" 2. The Reachy Mini service is started")
print(" 3. Try setting use_sim=True for simulation mode")
finally:
print("πŸ‘‹ Goodbye!")