Chapter 2: Whisper Integration & Cognitive Planning
Learning Objectives
- Integrate OpenAI Whisper for speech-to-text
- Use LLMs for high-level task planning
- Translate natural language to robot actions
- Build a complete conversational robot interface
2.1 Speech-to-Text with Whisper
Why Speech Interfaces?
Natural language is the most intuitive way for humans to communicate. For robots to be truly user-friendly, they need to understand speech.
Whisper Overview
OpenAI Whisper is a robust speech recognition model trained on 680,000 hours of multilingual data:
- Multi-language support (99 languages)
- Robust to accents and noise
- Punctuation and capitalization
- Multiple model sizes (tiny → large)
Installation
pip install openai-whisper
# Or for faster inference on GPU
pip install whisper-jax
Basic Usage
scripts/whisper_test.py
import whisper
# Load model (one of: tiny, base, small, medium, large)
model = whisper.load_model("base")
# Transcribe audio file
result = model.transcribe("command.wav")
print(result["text"])
# Output: "Robot, please pick up the red cup."
ROS 2 Integration
humanoid_vla/speech_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from audio_common_msgs.msg import AudioData
import whisper
import numpy as np
import io
import wave
class WhisperNode(Node):
def __init__(self):
super().__init__('whisper_node')
# Load Whisper model
self.get_logger().info('Loading Whisper model...')
self.model = whisper.load_model("base")
self.get_logger().info('Whisper model loaded')
# Subscribe to audio
self.audio_sub = self.create_subscription(
AudioData,
'/audio/audio',
self.audio_callback,
10
)
# Publish transcribed text
self.text_pub = self.create_publisher(
String,
'/speech/text',
10
)
# Buffer for audio chunks
self.audio_buffer = []
self.is_recording = False
def audio_callback(self, msg):
"""
Accumulate audio data and transcribe when complete.
"""
if self.is_recording:
self.audio_buffer.extend(msg.data)
def start_recording(self):
"""Start buffering audio."""
self.audio_buffer = []
self.is_recording = True
self.get_logger().info('Recording started')
def stop_recording_and_transcribe(self):
"""Stop recording and transcribe."""
self.is_recording = False
if not self.audio_buffer:
return
# Convert buffer to WAV
audio_np = np.array(self.audio_buffer, dtype=np.int16)
# Save to temporary WAV file
with wave.open('/tmp/temp_audio.wav', 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(audio_np.tobytes())
# Transcribe
self.get_logger().info('Transcribing...')
result = self.model.transcribe('/tmp/temp_audio.wav')
text = result["text"]
self.get_logger().info(f'Transcribed: {text}')
# Publish
msg = String()
msg.data = text
self.text_pub.publish(msg)
self.audio_buffer = []
def main(args=None):
rclpy.init(args=args)
node = WhisperNode()
rclpy.spin(node)
rclpy.shutdown()
Voice Activity Detection (VAD)
Instead of manually starting/stopping:
import webrtcvad
class VADWhisperNode(WhisperNode):
def __init__(self):
super().__init__()
self.vad = webrtcvad.Vad(mode=2) # Aggressiveness 0-3
self.speech_frames = []
def audio_callback(self, msg):
"""Detect speech and transcribe automatically."""
frame = bytes(msg.data)
# Check if frame contains speech
is_speech = self.vad.is_speech(frame, sample_rate=16000)
if is_speech:
self.speech_frames.append(frame)
elif self.speech_frames:
# Speech ended, transcribe
self.transcribe_frames(self.speech_frames)
self.speech_frames = []
2.2 Cognitive Planning with LLMs
The Planning Problem
User says: "I'm thirsty."
Robot needs to infer:
- User wants a drink
- Find cup in kitchen
- Navigate to kitchen
- Detect cup location
- Grasp cup
- Navigate to user
- Hand over cup
This is cognitive planning—understanding intent and decomposing into actions.
LLM as Planner
humanoid_vla/llm_planner.py
import openai
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class LLMPlanner(Node):
def __init__(self):
super().__init__('llm_planner')
# Set API key
self.declare_parameter('openai_api_key', '')
api_key = self.get_parameter('openai_api_key').value
openai.api_key = api_key
# Subscribe to speech
self.speech_sub = self.create_subscription(
String,
'/speech/text',
self.speech_callback,
10
)
# Publish plan
self.plan_pub = self.create_publisher(
String,
'/robot/plan',
10
)
# System prompt defining robot capabilities
self.system_prompt = """
You are a humanoid robot assistant. You can perform these actions:
- navigate_to(location): Move to a location (kitchen, living_room, user)
- detect_object(object_name): Find an object using vision
- grasp_object(object_name): Pick up an object
- release_object(): Release current object
- say(text): Speak to the user
When given a user request, output a JSON plan with a list of actions.
Example:
User: "Bring me a cup from the kitchen"
Output:
{
"reasoning": "User wants cup from kitchen",
"actions": [
{"action": "navigate_to", "args": ["kitchen"]},
{"action": "detect_object", "args": ["cup"]},
{"action": "grasp_object", "args": ["cup"]},
{"action": "navigate_to", "args": ["user"]},
{"action": "say", "args": ["Here is your cup"]}
]
}
"""
def speech_callback(self, msg):
"""Plan actions from user speech."""
user_request = msg.data
self.get_logger().info(f'Planning for: {user_request}')
# Call GPT-4
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_request}
],
temperature=0.3 # Low temperature for consistency
)
plan_json = response.choices[0].message.content
self.get_logger().info(f'Plan: {plan_json}')
# Publish plan
plan_msg = String()
plan_msg.data = plan_json
self.plan_pub.publish(plan_msg)
def main(args=None):
rclpy.init(args=args)
node = LLMPlanner()
rclpy.spin(node)
rclpy.shutdown()
2.3 Action Translation
Grounding LLM Actions to ROS
The LLM outputs high-level actions. We need to translate these to ROS 2 service calls or action goals.
humanoid_vla/action_executor.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from std_msgs.msg import String
from nav2_msgs.action import NavigateToPose
from geometry_msgs.msg import PoseStamped
import json
class ActionExecutor(Node):
def __init__(self):
super().__init__('action_executor')
# Subscribe to plans
self.plan_sub = self.create_subscription(
String,
'/robot/plan',
self.plan_callback,
10
)
# Navigation action client
self.nav_client = ActionClient(
self,
NavigateToPose,
'navigate_to_pose'
)
# Known locations
self.locations = {
'kitchen': {'x': 5.0, 'y': 2.0, 'theta': 0.0},
'living_room': {'x': -2.0, 'y': 1.0, 'theta': 1.57},
'user': {'x': 0.0, 'y': 0.0, 'theta': 0.0}
}
def plan_callback(self, msg):
"""Execute plan step by step."""
try:
plan = json.loads(msg.data)
actions = plan['actions']
self.get_logger().info(f"Executing {len(actions)} actions")
for action_dict in actions:
self.execute_action(action_dict)
except Exception as e:
self.get_logger().error(f'Plan execution failed: {e}')
def execute_action(self, action_dict):
"""Execute a single action."""
action_name = action_dict['action']
args = action_dict['args']
if action_name == 'navigate_to':
self.navigate_to(args[0])
elif action_name == 'grasp_object':
self.grasp_object(args[0])
elif action_name == 'say':
self.say(args[0])
# Add more action handlers...
def navigate_to(self, location_name):
"""Navigate to a named location."""
if location_name not in self.locations:
self.get_logger().error(f'Unknown location: {location_name}')
return
loc = self.locations[location_name]
# Create goal
goal_msg = NavigateToPose.Goal()
goal_msg.pose.header.frame_id = 'map'
goal_msg.pose.header.stamp = self.get_clock().now().to_msg()
goal_msg.pose.pose.position.x = loc['x']
goal_msg.pose.pose.position.y = loc['y']
# Convert theta to quaternion
from tf_transformations import quaternion_from_euler
q = quaternion_from_euler(0, 0, loc['theta'])
goal_msg.pose.pose.orientation.x = q[0]
goal_msg.pose.pose.orientation.y = q[1]
goal_msg.pose.pose.orientation.z = q[2]
goal_msg.pose.pose.orientation.w = q[3]
# Send goal
self.get_logger().info(f'Navigating to {location_name}')
future = self.nav_client.send_goal_async(goal_msg)
rclpy.spin_until_future_complete(self, future)
goal_handle = future.result()
result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, result_future)
self.get_logger().info(f'Reached {location_name}')
def grasp_object(self, object_name):
"""Grasp an object (stub - integrate with MoveIt)."""
self.get_logger().info(f'Grasping {object_name}')
# Implement MoveIt grasp here
def say(self, text):
"""Text-to-speech (stub)."""
self.get_logger().info(f'Robot says: {text}')
# Implement TTS here
2.4 Complete System
Launch All Nodes
launch/conversational_robot.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
# Speech recognition
Node(
package='humanoid_vla',
executable='whisper_node',
name='whisper_node'
),
# LLM planner
Node(
package='humanoid_vla',
executable='llm_planner',
name='llm_planner',
parameters=[{
'openai_api_key': os.getenv('OPENAI_API_KEY')
}]
),
# Action executor
Node(
package='humanoid_vla',
executable='action_executor',
name='action_executor'
),
])
Demo Flow
User speaks: "I'm thirsty"
↓
Whisper: "I'm thirsty"
↓
LLM: Plan 5 actions (navigate, detect, grasp, navigate, handover)
↓
Executor: Execute each action via ROS 2
↓
Robot: Brings cup to user
2.5 Advanced Topics
Few-Shot Prompting
system_prompt = """
Example 1:
User: "Clean the table"
Plan: [navigate_to(table), detect_object(trash), grasp_object(trash), navigate_to(bin), release_object()]
Example 2:
User: "I'm cold"
Plan: [navigate_to(bedroom), detect_object(blanket), grasp_object(blanket), navigate_to(user), say("Here is a blanket")]
Now plan for the following:
"""
Safety Constraints
def validate_plan(plan):
"""Check plan doesn't violate safety rules."""
for action in plan['actions']:
if action['action'] == 'navigate_to':
# Don't navigate to restricted areas
if action['args'][0] in ['basement', 'stairs']:
return False
return True
Summary
✅ Integrate Whisper for speech recognition
✅ Use LLMs for high-level planning
✅ Translate plans to ROS 2 actions
✅ Build complete conversational interface
Next: Sensors