Skip to main content

Chapter 2: Advanced ROS 2 Concepts

Learning Objectives

By the end of this chapter, you will be able to:

  • Create and use launch files to start complex systems
  • Manage node parameters for configuration
  • Implement lifecycle nodes for safety-critical applications
  • Use component composition for efficient communication
  • Integrate sensors into your ROS 2 system

2.1 Launch Files

The Problem with Manual Startup

Imagine starting a humanoid robot system:

# Terminal 1
ros2 run camera_driver realsense_node

# Terminal 2
ros2 run lidar_driver velodyne_node

# Terminal 3
ros2 run robot_state_publisher robot_state_publisher

# Terminal 4
ros2 run joint_controller controller_node

# ... 20 more terminals ...

This is impractical. Launch files solve this by defining the entire system in code.

Python Launch Files (ROS 2 Style)

ROS 2 uses Python for launch files, giving you the full power of a programming language.

launch/humanoid_bringup.launch.py
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, LogInfo
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os

def generate_launch_description():
"""
Launch a complete humanoid robot system.
"""

# Declare arguments
sim_arg = DeclareLaunchArgument(
'use_sim',
default_value='true',
description='Use simulation time'
)

robot_name_arg = DeclareLaunchArgument(
'robot_name',
default_value='humanoid_01',
description='Name of the robot'
)

# Get launch configuration values
use_sim = LaunchConfiguration('use_sim')
robot_name = LaunchConfiguration('robot_name')

# Find URDF file
pkg_path = get_package_share_directory('humanoid_description')
urdf_file = os.path.join(pkg_path, 'urdf', 'humanoid.urdf')

with open(urdf_file, 'r') as f:
robot_description = f.read()

# Define nodes
robot_state_publisher = Node(
package='robot_state_publisher',
executable='robot_state_publisher',
name='robot_state_publisher',
parameters=[{
'robot_description': robot_description,
'use_sim_time': use_sim
}],
output='screen'
)

joint_state_publisher = Node(
package='joint_state_publisher',
executable='joint_state_publisher',
name='joint_state_publisher',
parameters=[{
'use_sim_time': use_sim,
'source_list': ['joint_commands']
}]
)

controller_manager = Node(
package='controller_manager',
executable='ros2_control_node',
parameters=[{
'robot_description': robot_description,
'use_sim_time': use_sim
}],
output='screen'
)

# Camera driver
camera_driver = Node(
package='realsense2_camera',
executable='realsense2_camera_node',
name='camera',
parameters=[{
'enable_depth': True,
'enable_color': True,
'depth_module.profile': '640x480x30',
'rgb_camera.profile': '640x480x30'
}],
remappings=[
('depth/image_rect_raw', '/camera/depth/image'),
('color/image_raw', '/camera/rgb/image')
]
)

# RViz for visualization
rviz = Node(
package='rviz2',
executable='rviz2',
name='rviz2',
arguments=['-d', os.path.join(pkg_path, 'config', 'humanoid.rviz')],
parameters=[{'use_sim_time': use_sim}]
)

# Log message
log_message = LogInfo(
msg=['Launching humanoid robot: ', robot_name]
)

return LaunchDescription([
sim_arg,
robot_name_arg,
log_message,
robot_state_publisher,
joint_state_publisher,
controller_manager,
camera_driver,
rviz
])

Running Launch Files

# Basic usage
ros2 launch humanoid_bringup humanoid_bringup.launch.py

# With arguments
ros2 launch humanoid_bringup humanoid_bringup.launch.py use_sim:=false

# See available arguments
ros2 launch humanoid_bringup humanoid_bringup.launch.py --show-args

2.2 Parameters

Why Parameters?

Hard-coding values is bad:

# Bad - requires recompiling to change
CAMERA_FPS = 30

Parameters allow runtime configuration:

# Good - configurable without recompiling
self.declare_parameter('camera_fps', 30)
fps = self.get_parameter('camera_fps').value

Parameter Types

ROS 2 supports:

  • bool
  • int64
  • double
  • string
  • byte_array
  • Arrays of the above

Declaring and Using Parameters

humanoid_control/configurable_controller.py
import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import ParameterDescriptor

class ConfigurableController(Node):
def __init__(self):
super().__init__('configurable_controller')

# Declare parameter with descriptor (documentation)
self.declare_parameter(
'update_rate',
50.0,
ParameterDescriptor(
description='Control loop frequency in Hz',
floating_point_range=[
{'from_value': 10.0, 'to_value': 200.0, 'step': 0.0}
]
)
)

self.declare_parameter('robot_name', 'humanoid')
self.declare_parameter('enable_logging', True)

# Get parameter values
self.update_rate = self.get_parameter('update_rate').value
robot_name = self.get_parameter('robot_name').value

self.get_logger().info(
f'Controller for {robot_name} running at {self.update_rate} Hz'
)

# Register parameter callback for dynamic reconfiguration
self.add_on_set_parameters_callback(self.parameter_callback)

def parameter_callback(self, params):
"""Called when parameters change."""
for param in params:
if param.name == 'update_rate':
self.get_logger().info(
f'Update rate changed to {param.value}'
)
# Recreate timer with new rate
# (in real code, you'd handle this more carefully)

return SetParametersResult(successful=True)

Setting Parameters

# From command line
ros2 run humanoid_control configurable_controller \
--ros-args -p update_rate:=100.0

# From a YAML file
ros2 run humanoid_control configurable_controller \
--ros-args --params-file config/controller.yaml

# At runtime
ros2 param set /configurable_controller update_rate 75.0

# List all parameters
ros2 param list /configurable_controller

Parameter YAML Files

config/controller.yaml
configurable_controller:
ros__parameters:
update_rate: 100.0
robot_name: "atlas"
enable_logging: true
gains:
kp: 10.0
ki: 0.1
kd: 1.0

2.3 Lifecycle Nodes

The Problem with Regular Nodes

Regular nodes start immediately:

  1. Constructor runs
  2. Node starts publishing/subscribing
  3. No way to "prepare" then "activate"

For safety-critical systems (humanoid robots!), we need:

  • Configuration phase: Load parameters, allocate memory
  • Activation phase: Enable motors, start publishing
  • Deactivation phase: Stop motors safely
  • Cleanup phase: Release resources

Lifecycle State Machine

┌─────────────┐
│ Unconfigured│
└──────┬──────┘
│ configure()

┌─────────────┐
│ Inactive │◄─────┐
└──────┬──────┘ │
│ activate() │ deactivate()
▼ │
┌─────────────┐ │
│ Active │──────┘
└──────┬──────┘
│ cleanup()

┌─────────────┐
│ Finalized │
└─────────────┘

Implementing a Lifecycle Node

humanoid_control/lifecycle_controller.py
import rclpy
from rclpy.lifecycle import Node, State, TransitionCallbackReturn
from rclpy.lifecycle import Publisher
from sensor_msgs.msg import JointState

class LifecycleController(Node):
def __init__(self):
super().__init__('lifecycle_controller')
self.joint_pub = None
self.timer = None

def on_configure(self, state: State) -> TransitionCallbackReturn:
"""
Called when transitioning Unconfigured -> Inactive.
Load parameters, allocate resources.
"""
self.get_logger().info('Configuring...')

# Declare parameters
self.declare_parameter('update_rate', 50.0)
rate = self.get_parameter('update_rate').value

# Create lifecycle publisher (not active yet)
self.joint_pub = self.create_lifecycle_publisher(
JointState,
'joint_commands',
10
)

# Create timer (not started yet)
self.timer_period = 1.0 / rate

self.get_logger().info('Configuration complete')
return TransitionCallbackReturn.SUCCESS

def on_activate(self, state: State) -> TransitionCallbackReturn:
"""
Called when transitioning Inactive -> Active.
Enable motors, start control loop.
"""
self.get_logger().info('Activating...')

# Start publishing
self.joint_pub.on_activate(state)

# Start timer
self.timer = self.create_timer(
self.timer_period,
self.control_callback
)

self.get_logger().info('Active - motors enabled')
return TransitionCallbackReturn.SUCCESS

def on_deactivate(self, state: State) -> TransitionCallbackReturn:
"""
Called when transitioning Active -> Inactive.
Stop motors safely.
"""
self.get_logger().info('Deactivating...')

# Stop timer
if self.timer:
self.timer.cancel()
self.timer = None

# Stop publishing
self.joint_pub.on_deactivate(state)

# Send zero commands to motors
self.send_zero_commands()

self.get_logger().info('Deactivated - motors disabled')
return TransitionCallbackReturn.SUCCESS

def on_cleanup(self, state: State) -> TransitionCallbackReturn:
"""
Called when transitioning Inactive -> Unconfigured.
Release resources.
"""
self.get_logger().info('Cleaning up...')

# Destroy publisher
self.destroy_publisher(self.joint_pub)
self.joint_pub = None

return TransitionCallbackReturn.SUCCESS

def on_shutdown(self, state: State) -> TransitionCallbackReturn:
"""
Called when shutting down from any state.
Emergency cleanup.
"""
self.get_logger().info('Shutting down...')

if hasattr(self, 'timer') and self.timer:
self.timer.cancel()

self.send_zero_commands()

return TransitionCallbackReturn.SUCCESS

def control_callback(self):
"""Main control loop."""
msg = JointState()
# Fill with control commands
self.joint_pub.publish(msg)

def send_zero_commands(self):
"""Send zero velocity to all motors."""
# Implementation here
pass

Controlling Lifecycle

# Get current state
ros2 lifecycle get /lifecycle_controller

# Transition through states
ros2 lifecycle set /lifecycle_controller configure
ros2 lifecycle set /lifecycle_controller activate

# Deactivate when needed
ros2 lifecycle set /lifecycle_controller deactivate

2.4 Component Composition

Zero-Copy Communication

Normally, data is copied between nodes:

Node A (process 1) → serialize → DDS → deserialize → Node B (process 2)

With component composition, nodes run in the same process:

Node A (shared memory) ← zero copy → Node B (same process)

This is critical for real-time systems with high-bandwidth data (camera images).

Creating a Component

humanoid_control/component_node.py
import rclpy
from rclpy.node import Node

class ImageProcessor(Node):
def __init__(self, options):
super().__init__('image_processor', **options)
# Node implementation
setup.py
setup(
# ...
entry_points={
'console_scripts': [
'image_processor = humanoid_control.component_node:main',
],
},
)

Running as Component

# Start container
ros2 run rclcpp_components component_container

# Load component
ros2 component load /ComponentManager humanoid_control \
humanoid_control.component_node.ImageProcessor

2.5 Sensor Integration

Camera (RealSense)

from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2

class CameraSubscriber(Node):
def __init__(self):
super().__init__('camera_subscriber')
self.bridge = CvBridge()

self.subscription = self.create_subscription(
Image,
'/camera/color/image_raw',
self.image_callback,
10
)

def image_callback(self, msg):
# Convert ROS Image to OpenCV
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')

# Process image
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)

# Show
cv2.imshow('Camera Feed', gray)
cv2.waitKey(1)

LiDAR

from sensor_msgs.msg import LaserScan

class LidarSubscriber(Node):
def __init__(self):
super().__init__('lidar_subscriber')

self.subscription = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10
)

def scan_callback(self, msg):
# msg.ranges is list of distances
min_distance = min(msg.ranges)

if min_distance < 0.5: # 50cm
self.get_logger().warn('Obstacle detected!')

IMU

from sensor_msgs.msg import Imu
import numpy as np

class ImuSubscriber(Node):
def __init__(self):
super().__init__('imu_subscriber')

self.subscription = self.create_subscription(
Imu,
'/imu',
self.imu_callback,
10
)

def imu_callback(self, msg):
# Get orientation quaternion
q = msg.orientation

# Convert to Euler angles (if needed)
from tf_transformations import euler_from_quaternion
roll, pitch, yaw = euler_from_quaternion([q.x, q.y, q.z, q.w])

# Check if robot is tilted
if abs(pitch) > 0.5: # ~30 degrees
self.get_logger().error('Robot tilting!')

Summary

You've learned advanced ROS 2 concepts:

✅ Launch files for system management
✅ Parameters for configuration
✅ Lifecycle nodes for safety-critical applications
✅ Component composition for zero-copy performance
✅ Sensor integration patterns

Next: Module 2: Simulation