Skip to main content

Chapter 2: Whisper Integration & Cognitive Planning

Learning Objectives

  • Integrate OpenAI Whisper for speech-to-text
  • Use LLMs for high-level task planning
  • Translate natural language to robot actions
  • Build a complete conversational robot interface

2.1 Speech-to-Text with Whisper

Why Speech Interfaces?

Natural language is the most intuitive way for humans to communicate. For robots to be truly user-friendly, they need to understand speech.

Whisper Overview

OpenAI Whisper is a robust speech recognition model trained on 680,000 hours of multilingual data:

  • Multi-language support (99 languages)
  • Robust to accents and noise
  • Punctuation and capitalization
  • Multiple model sizes (tiny → large)

Installation

pip install openai-whisper
# Or for faster inference on GPU
pip install whisper-jax

Basic Usage

scripts/whisper_test.py
import whisper

# Load model (one of: tiny, base, small, medium, large)
model = whisper.load_model("base")

# Transcribe audio file
result = model.transcribe("command.wav")

print(result["text"])
# Output: "Robot, please pick up the red cup."

ROS 2 Integration

humanoid_vla/speech_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from audio_common_msgs.msg import AudioData
import whisper
import numpy as np
import io
import wave

class WhisperNode(Node):
def __init__(self):
super().__init__('whisper_node')

# Load Whisper model
self.get_logger().info('Loading Whisper model...')
self.model = whisper.load_model("base")
self.get_logger().info('Whisper model loaded')

# Subscribe to audio
self.audio_sub = self.create_subscription(
AudioData,
'/audio/audio',
self.audio_callback,
10
)

# Publish transcribed text
self.text_pub = self.create_publisher(
String,
'/speech/text',
10
)

# Buffer for audio chunks
self.audio_buffer = []
self.is_recording = False

def audio_callback(self, msg):
"""
Accumulate audio data and transcribe when complete.
"""
if self.is_recording:
self.audio_buffer.extend(msg.data)

def start_recording(self):
"""Start buffering audio."""
self.audio_buffer = []
self.is_recording = True
self.get_logger().info('Recording started')

def stop_recording_and_transcribe(self):
"""Stop recording and transcribe."""
self.is_recording = False

if not self.audio_buffer:
return

# Convert buffer to WAV
audio_np = np.array(self.audio_buffer, dtype=np.int16)

# Save to temporary WAV file
with wave.open('/tmp/temp_audio.wav', 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(audio_np.tobytes())

# Transcribe
self.get_logger().info('Transcribing...')
result = self.model.transcribe('/tmp/temp_audio.wav')
text = result["text"]

self.get_logger().info(f'Transcribed: {text}')

# Publish
msg = String()
msg.data = text
self.text_pub.publish(msg)

self.audio_buffer = []

def main(args=None):
rclpy.init(args=args)
node = WhisperNode()
rclpy.spin(node)
rclpy.shutdown()

Voice Activity Detection (VAD)

Instead of manually starting/stopping:

import webrtcvad

class VADWhisperNode(WhisperNode):
def __init__(self):
super().__init__()
self.vad = webrtcvad.Vad(mode=2) # Aggressiveness 0-3
self.speech_frames = []

def audio_callback(self, msg):
"""Detect speech and transcribe automatically."""
frame = bytes(msg.data)

# Check if frame contains speech
is_speech = self.vad.is_speech(frame, sample_rate=16000)

if is_speech:
self.speech_frames.append(frame)
elif self.speech_frames:
# Speech ended, transcribe
self.transcribe_frames(self.speech_frames)
self.speech_frames = []

2.2 Cognitive Planning with LLMs

The Planning Problem

User says: "I'm thirsty."

Robot needs to infer:

  1. User wants a drink
  2. Find cup in kitchen
  3. Navigate to kitchen
  4. Detect cup location
  5. Grasp cup
  6. Navigate to user
  7. Hand over cup

This is cognitive planning—understanding intent and decomposing into actions.

LLM as Planner

humanoid_vla/llm_planner.py
import openai
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class LLMPlanner(Node):
def __init__(self):
super().__init__('llm_planner')

# Set API key
self.declare_parameter('openai_api_key', '')
api_key = self.get_parameter('openai_api_key').value
openai.api_key = api_key

# Subscribe to speech
self.speech_sub = self.create_subscription(
String,
'/speech/text',
self.speech_callback,
10
)

# Publish plan
self.plan_pub = self.create_publisher(
String,
'/robot/plan',
10
)

# System prompt defining robot capabilities
self.system_prompt = """
You are a humanoid robot assistant. You can perform these actions:
- navigate_to(location): Move to a location (kitchen, living_room, user)
- detect_object(object_name): Find an object using vision
- grasp_object(object_name): Pick up an object
- release_object(): Release current object
- say(text): Speak to the user

When given a user request, output a JSON plan with a list of actions.
Example:
User: "Bring me a cup from the kitchen"
Output:
{
"reasoning": "User wants cup from kitchen",
"actions": [
{"action": "navigate_to", "args": ["kitchen"]},
{"action": "detect_object", "args": ["cup"]},
{"action": "grasp_object", "args": ["cup"]},
{"action": "navigate_to", "args": ["user"]},
{"action": "say", "args": ["Here is your cup"]}
]
}
"""

def speech_callback(self, msg):
"""Plan actions from user speech."""
user_request = msg.data

self.get_logger().info(f'Planning for: {user_request}')

# Call GPT-4
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_request}
],
temperature=0.3 # Low temperature for consistency
)

plan_json = response.choices[0].message.content

self.get_logger().info(f'Plan: {plan_json}')

# Publish plan
plan_msg = String()
plan_msg.data = plan_json
self.plan_pub.publish(plan_msg)

def main(args=None):
rclpy.init(args=args)
node = LLMPlanner()
rclpy.spin(node)
rclpy.shutdown()

2.3 Action Translation

Grounding LLM Actions to ROS

The LLM outputs high-level actions. We need to translate these to ROS 2 service calls or action goals.

humanoid_vla/action_executor.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from std_msgs.msg import String
from nav2_msgs.action import NavigateToPose
from geometry_msgs.msg import PoseStamped
import json

class ActionExecutor(Node):
def __init__(self):
super().__init__('action_executor')

# Subscribe to plans
self.plan_sub = self.create_subscription(
String,
'/robot/plan',
self.plan_callback,
10
)

# Navigation action client
self.nav_client = ActionClient(
self,
NavigateToPose,
'navigate_to_pose'
)

# Known locations
self.locations = {
'kitchen': {'x': 5.0, 'y': 2.0, 'theta': 0.0},
'living_room': {'x': -2.0, 'y': 1.0, 'theta': 1.57},
'user': {'x': 0.0, 'y': 0.0, 'theta': 0.0}
}

def plan_callback(self, msg):
"""Execute plan step by step."""
try:
plan = json.loads(msg.data)
actions = plan['actions']

self.get_logger().info(f"Executing {len(actions)} actions")

for action_dict in actions:
self.execute_action(action_dict)

except Exception as e:
self.get_logger().error(f'Plan execution failed: {e}')

def execute_action(self, action_dict):
"""Execute a single action."""
action_name = action_dict['action']
args = action_dict['args']

if action_name == 'navigate_to':
self.navigate_to(args[0])

elif action_name == 'grasp_object':
self.grasp_object(args[0])

elif action_name == 'say':
self.say(args[0])

# Add more action handlers...

def navigate_to(self, location_name):
"""Navigate to a named location."""
if location_name not in self.locations:
self.get_logger().error(f'Unknown location: {location_name}')
return

loc = self.locations[location_name]

# Create goal
goal_msg = NavigateToPose.Goal()
goal_msg.pose.header.frame_id = 'map'
goal_msg.pose.header.stamp = self.get_clock().now().to_msg()
goal_msg.pose.pose.position.x = loc['x']
goal_msg.pose.pose.position.y = loc['y']

# Convert theta to quaternion
from tf_transformations import quaternion_from_euler
q = quaternion_from_euler(0, 0, loc['theta'])
goal_msg.pose.pose.orientation.x = q[0]
goal_msg.pose.pose.orientation.y = q[1]
goal_msg.pose.pose.orientation.z = q[2]
goal_msg.pose.pose.orientation.w = q[3]

# Send goal
self.get_logger().info(f'Navigating to {location_name}')
future = self.nav_client.send_goal_async(goal_msg)
rclpy.spin_until_future_complete(self, future)

goal_handle = future.result()
result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, result_future)

self.get_logger().info(f'Reached {location_name}')

def grasp_object(self, object_name):
"""Grasp an object (stub - integrate with MoveIt)."""
self.get_logger().info(f'Grasping {object_name}')
# Implement MoveIt grasp here

def say(self, text):
"""Text-to-speech (stub)."""
self.get_logger().info(f'Robot says: {text}')
# Implement TTS here

2.4 Complete System

Launch All Nodes

launch/conversational_robot.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
return LaunchDescription([

# Speech recognition
Node(
package='humanoid_vla',
executable='whisper_node',
name='whisper_node'
),

# LLM planner
Node(
package='humanoid_vla',
executable='llm_planner',
name='llm_planner',
parameters=[{
'openai_api_key': os.getenv('OPENAI_API_KEY')
}]
),

# Action executor
Node(
package='humanoid_vla',
executable='action_executor',
name='action_executor'
),

])

Demo Flow

User speaks: "I'm thirsty"

Whisper: "I'm thirsty"

LLM: Plan 5 actions (navigate, detect, grasp, navigate, handover)

Executor: Execute each action via ROS 2

Robot: Brings cup to user

2.5 Advanced Topics

Few-Shot Prompting

system_prompt = """
Example 1:
User: "Clean the table"
Plan: [navigate_to(table), detect_object(trash), grasp_object(trash), navigate_to(bin), release_object()]

Example 2:
User: "I'm cold"
Plan: [navigate_to(bedroom), detect_object(blanket), grasp_object(blanket), navigate_to(user), say("Here is a blanket")]

Now plan for the following:
"""

Safety Constraints

def validate_plan(plan):
"""Check plan doesn't violate safety rules."""
for action in plan['actions']:
if action['action'] == 'navigate_to':
# Don't navigate to restricted areas
if action['args'][0] in ['basement', 'stairs']:
return False
return True

Summary

✅ Integrate Whisper for speech recognition
✅ Use LLMs for high-level planning
✅ Translate plans to ROS 2 actions
✅ Build complete conversational interface

Next: Sensors