Chapter 2: Advanced ROS 2 Concepts
Learning Objectives
By the end of this chapter, you will be able to:
- Create and use launch files to start complex systems
- Manage node parameters for configuration
- Implement lifecycle nodes for safety-critical applications
- Use component composition for efficient communication
- Integrate sensors into your ROS 2 system
2.1 Launch Files
The Problem with Manual Startup
Imagine starting a humanoid robot system:
# Terminal 1
ros2 run camera_driver realsense_node
# Terminal 2
ros2 run lidar_driver velodyne_node
# Terminal 3
ros2 run robot_state_publisher robot_state_publisher
# Terminal 4
ros2 run joint_controller controller_node
# ... 20 more terminals ...
This is impractical. Launch files solve this by defining the entire system in code.
Python Launch Files (ROS 2 Style)
ROS 2 uses Python for launch files, giving you the full power of a programming language.
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, LogInfo
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
"""
Launch a complete humanoid robot system.
"""
# Declare arguments
sim_arg = DeclareLaunchArgument(
'use_sim',
default_value='true',
description='Use simulation time'
)
robot_name_arg = DeclareLaunchArgument(
'robot_name',
default_value='humanoid_01',
description='Name of the robot'
)
# Get launch configuration values
use_sim = LaunchConfiguration('use_sim')
robot_name = LaunchConfiguration('robot_name')
# Find URDF file
pkg_path = get_package_share_directory('humanoid_description')
urdf_file = os.path.join(pkg_path, 'urdf', 'humanoid.urdf')
with open(urdf_file, 'r') as f:
robot_description = f.read()
# Define nodes
robot_state_publisher = Node(
package='robot_state_publisher',
executable='robot_state_publisher',
name='robot_state_publisher',
parameters=[{
'robot_description': robot_description,
'use_sim_time': use_sim
}],
output='screen'
)
joint_state_publisher = Node(
package='joint_state_publisher',
executable='joint_state_publisher',
name='joint_state_publisher',
parameters=[{
'use_sim_time': use_sim,
'source_list': ['joint_commands']
}]
)
controller_manager = Node(
package='controller_manager',
executable='ros2_control_node',
parameters=[{
'robot_description': robot_description,
'use_sim_time': use_sim
}],
output='screen'
)
# Camera driver
camera_driver = Node(
package='realsense2_camera',
executable='realsense2_camera_node',
name='camera',
parameters=[{
'enable_depth': True,
'enable_color': True,
'depth_module.profile': '640x480x30',
'rgb_camera.profile': '640x480x30'
}],
remappings=[
('depth/image_rect_raw', '/camera/depth/image'),
('color/image_raw', '/camera/rgb/image')
]
)
# RViz for visualization
rviz = Node(
package='rviz2',
executable='rviz2',
name='rviz2',
arguments=['-d', os.path.join(pkg_path, 'config', 'humanoid.rviz')],
parameters=[{'use_sim_time': use_sim}]
)
# Log message
log_message = LogInfo(
msg=['Launching humanoid robot: ', robot_name]
)
return LaunchDescription([
sim_arg,
robot_name_arg,
log_message,
robot_state_publisher,
joint_state_publisher,
controller_manager,
camera_driver,
rviz
])
Running Launch Files
# Basic usage
ros2 launch humanoid_bringup humanoid_bringup.launch.py
# With arguments
ros2 launch humanoid_bringup humanoid_bringup.launch.py use_sim:=false
# See available arguments
ros2 launch humanoid_bringup humanoid_bringup.launch.py --show-args
2.2 Parameters
Why Parameters?
Hard-coding values is bad:
# Bad - requires recompiling to change
CAMERA_FPS = 30
Parameters allow runtime configuration:
# Good - configurable without recompiling
self.declare_parameter('camera_fps', 30)
fps = self.get_parameter('camera_fps').value
Parameter Types
ROS 2 supports:
boolint64doublestringbyte_array- Arrays of the above
Declaring and Using Parameters
import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import ParameterDescriptor
class ConfigurableController(Node):
def __init__(self):
super().__init__('configurable_controller')
# Declare parameter with descriptor (documentation)
self.declare_parameter(
'update_rate',
50.0,
ParameterDescriptor(
description='Control loop frequency in Hz',
floating_point_range=[
{'from_value': 10.0, 'to_value': 200.0, 'step': 0.0}
]
)
)
self.declare_parameter('robot_name', 'humanoid')
self.declare_parameter('enable_logging', True)
# Get parameter values
self.update_rate = self.get_parameter('update_rate').value
robot_name = self.get_parameter('robot_name').value
self.get_logger().info(
f'Controller for {robot_name} running at {self.update_rate} Hz'
)
# Register parameter callback for dynamic reconfiguration
self.add_on_set_parameters_callback(self.parameter_callback)
def parameter_callback(self, params):
"""Called when parameters change."""
for param in params:
if param.name == 'update_rate':
self.get_logger().info(
f'Update rate changed to {param.value}'
)
# Recreate timer with new rate
# (in real code, you'd handle this more carefully)
return SetParametersResult(successful=True)
Setting Parameters
# From command line
ros2 run humanoid_control configurable_controller \
--ros-args -p update_rate:=100.0
# From a YAML file
ros2 run humanoid_control configurable_controller \
--ros-args --params-file config/controller.yaml
# At runtime
ros2 param set /configurable_controller update_rate 75.0
# List all parameters
ros2 param list /configurable_controller
Parameter YAML Files
configurable_controller:
ros__parameters:
update_rate: 100.0
robot_name: "atlas"
enable_logging: true
gains:
kp: 10.0
ki: 0.1
kd: 1.0
2.3 Lifecycle Nodes
The Problem with Regular Nodes
Regular nodes start immediately:
- Constructor runs
- Node starts publishing/subscribing
- No way to "prepare" then "activate"
For safety-critical systems (humanoid robots!), we need:
- Configuration phase: Load parameters, allocate memory
- Activation phase: Enable motors, start publishing
- Deactivation phase: Stop motors safely
- Cleanup phase: Release resources
Lifecycle State Machine
┌─────────────┐
│ Unconfigured│
└──────┬──────┘
│ configure()
▼
┌─────────────┐
│ Inactive │◄─────┐
└──────┬──────┘ │
│ activate() │ deactivate()
▼ │
┌─────────────┐ │
│ Active │──────┘
└──────┬──────┘
│ cleanup()
▼
┌─────────────┐
│ Finalized │
└─────────────┘
Implementing a Lifecycle Node
import rclpy
from rclpy.lifecycle import Node, State, TransitionCallbackReturn
from rclpy.lifecycle import Publisher
from sensor_msgs.msg import JointState
class LifecycleController(Node):
def __init__(self):
super().__init__('lifecycle_controller')
self.joint_pub = None
self.timer = None
def on_configure(self, state: State) -> TransitionCallbackReturn:
"""
Called when transitioning Unconfigured -> Inactive.
Load parameters, allocate resources.
"""
self.get_logger().info('Configuring...')
# Declare parameters
self.declare_parameter('update_rate', 50.0)
rate = self.get_parameter('update_rate').value
# Create lifecycle publisher (not active yet)
self.joint_pub = self.create_lifecycle_publisher(
JointState,
'joint_commands',
10
)
# Create timer (not started yet)
self.timer_period = 1.0 / rate
self.get_logger().info('Configuration complete')
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
"""
Called when transitioning Inactive -> Active.
Enable motors, start control loop.
"""
self.get_logger().info('Activating...')
# Start publishing
self.joint_pub.on_activate(state)
# Start timer
self.timer = self.create_timer(
self.timer_period,
self.control_callback
)
self.get_logger().info('Active - motors enabled')
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
"""
Called when transitioning Active -> Inactive.
Stop motors safely.
"""
self.get_logger().info('Deactivating...')
# Stop timer
if self.timer:
self.timer.cancel()
self.timer = None
# Stop publishing
self.joint_pub.on_deactivate(state)
# Send zero commands to motors
self.send_zero_commands()
self.get_logger().info('Deactivated - motors disabled')
return TransitionCallbackReturn.SUCCESS
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
"""
Called when transitioning Inactive -> Unconfigured.
Release resources.
"""
self.get_logger().info('Cleaning up...')
# Destroy publisher
self.destroy_publisher(self.joint_pub)
self.joint_pub = None
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: State) -> TransitionCallbackReturn:
"""
Called when shutting down from any state.
Emergency cleanup.
"""
self.get_logger().info('Shutting down...')
if hasattr(self, 'timer') and self.timer:
self.timer.cancel()
self.send_zero_commands()
return TransitionCallbackReturn.SUCCESS
def control_callback(self):
"""Main control loop."""
msg = JointState()
# Fill with control commands
self.joint_pub.publish(msg)
def send_zero_commands(self):
"""Send zero velocity to all motors."""
# Implementation here
pass
Controlling Lifecycle
# Get current state
ros2 lifecycle get /lifecycle_controller
# Transition through states
ros2 lifecycle set /lifecycle_controller configure
ros2 lifecycle set /lifecycle_controller activate
# Deactivate when needed
ros2 lifecycle set /lifecycle_controller deactivate
2.4 Component Composition
Zero-Copy Communication
Normally, data is copied between nodes:
Node A (process 1) → serialize → DDS → deserialize → Node B (process 2)
With component composition, nodes run in the same process:
Node A (shared memory) ← zero copy → Node B (same process)
This is critical for real-time systems with high-bandwidth data (camera images).
Creating a Component
import rclpy
from rclpy.node import Node
class ImageProcessor(Node):
def __init__(self, options):
super().__init__('image_processor', **options)
# Node implementation
setup(
# ...
entry_points={
'console_scripts': [
'image_processor = humanoid_control.component_node:main',
],
},
)
Running as Component
# Start container
ros2 run rclcpp_components component_container
# Load component
ros2 component load /ComponentManager humanoid_control \
humanoid_control.component_node.ImageProcessor
2.5 Sensor Integration
Camera (RealSense)
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
class CameraSubscriber(Node):
def __init__(self):
super().__init__('camera_subscriber')
self.bridge = CvBridge()
self.subscription = self.create_subscription(
Image,
'/camera/color/image_raw',
self.image_callback,
10
)
def image_callback(self, msg):
# Convert ROS Image to OpenCV
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
# Process image
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
# Show
cv2.imshow('Camera Feed', gray)
cv2.waitKey(1)
LiDAR
from sensor_msgs.msg import LaserScan
class LidarSubscriber(Node):
def __init__(self):
super().__init__('lidar_subscriber')
self.subscription = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10
)
def scan_callback(self, msg):
# msg.ranges is list of distances
min_distance = min(msg.ranges)
if min_distance < 0.5: # 50cm
self.get_logger().warn('Obstacle detected!')
IMU
from sensor_msgs.msg import Imu
import numpy as np
class ImuSubscriber(Node):
def __init__(self):
super().__init__('imu_subscriber')
self.subscription = self.create_subscription(
Imu,
'/imu',
self.imu_callback,
10
)
def imu_callback(self, msg):
# Get orientation quaternion
q = msg.orientation
# Convert to Euler angles (if needed)
from tf_transformations import euler_from_quaternion
roll, pitch, yaw = euler_from_quaternion([q.x, q.y, q.z, q.w])
# Check if robot is tilted
if abs(pitch) > 0.5: # ~30 degrees
self.get_logger().error('Robot tilting!')
Summary
You've learned advanced ROS 2 concepts:
✅ Launch files for system management
✅ Parameters for configuration
✅ Lifecycle nodes for safety-critical applications
✅ Component composition for zero-copy performance
✅ Sensor integration patterns
Next: Module 2: Simulation