Dynamic Path Planning and Real-time Obstacle Avoidance

In the real world, robots don’t operate in static environments. They must navigate around moving obstacles, avoid dynamic hazards, and adapt their paths in real-time. Today, we’ll explore advanced path planning algorithms designed for dynamic environments, including the Dynamic Window Approach (DWA) and Timed Elastic Band (TEB) algorithm.

Introduction to Dynamic Path Planning

Static path planning assumes a fixed environment, but real-world scenarios require:

Dynamic Window Approach (DWA)

DWA is a popular real-time obstacle avoidance algorithm that generates collision-free trajectories by evaluating the robot’s dynamic capabilities.

DWA Fundamentals

DWA works by:

  1. Velocity space sampling: Generate possible velocity commands
  2. Trajectory evaluation: Simulate trajectories and score them
  3. Obstacle checking: Check for collisions with moving obstacles
  4. Goal attraction: Score trajectories based on goal proximity
  5. Velocity constraints: Respect robot’s dynamic limits

DWA Implementation

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle
import matplotlib.animation as animation
from scipy.interpolate import interp1d

class DynamicWindow:
    """Dynamic window for velocity sampling"""
    def __init__(self, robot):
        self.robot = robot
    
    def calculate_window(self, v, w, dt):
        """Calculate dynamic window based on robot dynamics"""
        # Current velocity
        v_curr = v
        w_curr = w
        
        # Dynamic constraints
        v_max = self.robot.max_velocity
        w_max = self.robot.max_angular_velocity
        a_max = self.robot.max_acceleration
        alpha_max = self.robot.max_angular_acceleration
        
        # Velocity window
        v_min = max(0, v_curr - a_max * dt)
        v_max = min(v_max, v_curr + a_max * dt)
        w_min = max(-w_max, w_curr - alpha_max * dt)
        w_max = min(w_max, w_curr + alpha_max * dt)
        
        return v_min, v_max, w_min, w_max

class RobotDWA:
    def __init__(self, max_velocity=1.0, max_angular_velocity=1.0, 
                 max_acceleration=0.5, max_angular_acceleration=1.0,
                 goal_threshold=0.3, obstacle_threshold=0.5):
        """
        Initialize DWA robot
        
        Args:
            max_velocity: Maximum linear velocity (m/s)
            max_angular_velocity: Maximum angular velocity (rad/s)
            max_acceleration: Maximum linear acceleration (m/s²)
            max_angular_acceleration: Maximum angular acceleration (rad/s²)
            goal_threshold: Distance to goal for completion
            obstacle_threshold: Distance threshold for obstacle avoidance
        """
        self.max_velocity = max_velocity
        self.max_angular_velocity = max_angular_velocity
        self.max_acceleration = max_acceleration
        self.max_angular_acceleration = max_angular_acceleration
        self.goal_threshold = goal_threshold
        self.obstacle_threshold = obstacle_threshold
        
        self.dynamic_window = DynamicWindow(self)
    
    def predict_trajectory(self, v, w, x, y, theta, dt=0.1, horizon=3.0):
        """
        Predict robot trajectory for given velocity commands
        
        Args:
            v: Linear velocity
            w: Angular velocity
            x, y, theta: Current robot state
            dt: Time step
            horizon: Prediction horizon
        
        Returns:
            Trajectory points and time stamps
        """
        trajectory = []
        time_stamps = []
        
        t = 0
        curr_x, curr_y, curr_theta = x, y, theta
        
        while t < horizon:
            # Update robot state
            curr_x += v * np.cos(curr_theta) * dt
            curr_y += v * np.sin(curr_theta) * dt
            curr_theta += w * dt
            
            trajectory.append([curr_x, curr_y, curr_theta])
            time_stamps.append(t)
            
            t += dt
        
        return np.array(trajectory), np.array(time_stamps)
    
    def calculate_goal_distance(self, trajectory, goal_x, goal_y):
        """Calculate distance to goal for trajectory"""
        final_pos = trajectory[-1, :2]
        return np.linalg.norm(final_pos - np.array([goal_x, goal_y]))
    
    def calculate_obstacle_cost(self, trajectory, obstacles):
        """Calculate cost related to obstacles"""
        min_distance = float('inf')
        
        for i, point in enumerate(trajectory):
            for obs in obstacles:
                obs_x, obs_y, obs_radius = obs
                distance = np.sqrt((point[0] - obs_x)**2 + (point[1] - obs_y)**2)
                min_distance = min(min_distance, distance - obs_radius)
        
        # Convert distance to cost (closer = higher cost)
        if min_distance < self.obstacle_threshold:
            return 1.0 / (min_distance + 0.1)  # High cost when close
        else:
            return 0.0  # No cost when far
    
    def calculate_velocity_cost(self, v, w):
        """Calculate cost based on velocity (prefer moderate speeds)"""
        v_cost = abs(v) / self.max_velocity
        w_cost = abs(w) / self.max_angular_velocity
        return 0.1 * (v_cost + w_cost)
    
    def evaluate_trajectory(self, trajectory, goal_x, goal_y, obstacles):
        """Evaluate trajectory score"""
        # Goal distance cost (closer is better)
        goal_distance = self.calculate_goal_distance(trajectory, goal_x, goal_y)
        goal_cost = goal_distance
        
        # Obstacle cost (farther is better)
        obstacle_cost = self.calculate_obstacle_cost(trajectory, obstacles)
        
        # Velocity cost (moderate speeds preferred)
        v = trajectory[0, 0]  # Approximate linear velocity
        w = trajectory[0, 2]  # Approximate angular velocity
        velocity_cost = self.calculate_velocity_cost(v, w)
        
        # Total cost (lower is better)
        total_cost = goal_cost + 10.0 * obstacle_cost + velocity_cost
        
        return -total_cost  # Negative for maximization
    
    def plan_motion(self, x, y, theta, goal_x, goal_y, obstacles, dt=0.1):
        """
        Plan motion using DWA
        
        Args:
            x, y, theta: Current robot state
            goal_x, goal_y: Goal position
            obstacles: List of obstacle positions and radii
            dt: Time step
        
        Returns:
            Best velocity commands (v, w)
        """
        # Get dynamic window
        v_min, v_max, w_min, w_max = self.dynamic_window.calculate_window(0, 0, dt)
        
        # Sample velocities
        v_samples = np.linspace(v_min, v_max, 15)
        w_samples = np.linspace(w_min, w_max, 20)
        
        best_score = -float('inf')
        best_v, best_w = 0, 0
        
        # Evaluate all velocity combinations
        for v in v_samples:
            for w in w_samples:
                # Predict trajectory
                trajectory, _ = self.predict_trajectory(v, w, x, y, theta, dt)
                
                # Evaluate trajectory
                score = self.evaluate_trajectory(trajectory, goal_x, goal_y, obstacles)
                
                # Update best
                if score > best_score:
                    best_score = score
                    best_v, best_w = v, w
        
        return best_v, best_w

# Example DWA usage
class SimpleRobot:
    def __init__(self):
        self.max_velocity = 1.0
        self.max_angular_velocity = 1.0
        self.max_acceleration = 0.5
        self.max_angular_acceleration = 1.0

robot = SimpleRobot()
dwa = RobotDWA()

# Test DWA
x, y, theta = 0, 0, 0
goal_x, goal_y = 5, 5
obstacles = [(2, 2, 0.3), (3, 4, 0.4), (4, 1, 0.3)]

v, w = dwa.plan_motion(x, y, theta, goal_x, goal_y, obstacles)
print(f"Best velocity commands: v={v:.3f}, w={w:.3f}")

Timed Elastic Band (TEB)

TEB is an elastic band approach that optimizes trajectories in both space and time, making it ideal for dynamic environments.

TEB Fundamentals

TEB works by:

  1. Initial path: Generate an initial path (e.g., from A*)
  2. Elastic deformation: Apply forces to make the path elastic
  3. Time optimization: Optimize timing for each segment
  4. Constraint satisfaction: Respect robot dynamics and constraints
  5. Real-time adaptation: Continuously update the path

TEB Implementation

class TEBPlanner:
    def __init__(self, robot, num_points=20, dt=0.1):
        """
        Initialize TEB planner
        
        Args:
            robot: Robot with dynamic constraints
            num_points: Number of points in the elastic band
            dt: Time step for trajectory
        """
        self.robot = robot
        self.num_points = num_points
        self.dt = dt
        
        # Elastic band parameters
        self.k_spring = 50.0  # Spring constant
        self.k_damping = 10.0  # Damping constant
        self.k_obstacle = 100.0  # Obstacle repulsion constant
        
        # Initialize path
        self.path = None
        self.velocities = None
        self.times = None
    
    def initialize_path(self, start, goal, obstacles):
        """Initialize elastic band with straight line path"""
        # Create initial straight line path
        self.path = np.linspace(start[:2], goal[:2], self.num_points)
        
        # Add some initial curvature to avoid obstacles
        self._optimize_path_shape(obstacles)
        
        # Initialize velocities and times
        self.velocities = np.zeros((self.num_points, 2))
        self.times = np.arange(self.num_points) * self.dt
    
    def _optimize_path_shape(self, obstacles):
        """Optimize path shape to avoid obstacles"""
        for iteration in range(50):
            forces = np.zeros_like(self.path)
            
            # Spring forces between consecutive points
            for i in range(1, len(self.path) - 1):
                # Force from previous point
                prev_force = self.k_spring * (self.path[i-1] - self.path[i])
                # Force from next point
                next_force = self.k_spring * (self.path[i+1] - self.path[i])
                forces[i] = prev_force + next_force
                
                # Damping force
                forces[i] -= self.k_damping * self.velocities[i]
            
            # Obstacle repulsion forces
            for i, point in enumerate(self.path):
                for obs in obstacles:
                    obs_x, obs_y, obs_radius = obs
                    diff = point - np.array([obs_x, obs_y])
                    distance = np.linalg.norm(diff)
                    
                    if distance < obs_radius + 1.0:  # Influence radius
                        if distance > 0:
                            repulsion_force = self.k_obstacle * (1.0 / distance**2) * (diff / distance)
                            forces[i] += repulsion_force
            
            # Update path and velocities
            self.path += forces * self.dt**2
            self.velocities += forces * self.dt
            
            # Keep start and goal fixed
            self.path[0] = np.array([self.path[0][0], self.path[0][1]])
            self.path[-1] = np.array([self.path[-1][0], self.path[-1][1]])
    
    def _optimize_timing(self, obstacles):
        """Optimize timing for each path segment"""
        for iteration in range(30):
            for i in range(len(self.path) - 1):
                # Current segment
                segment = self.path[i+1] - self.path[i]
                segment_length = np.linalg.norm(segment)
                
                if segment_length > 0:
                    # Desired velocity for this segment
                    desired_velocity = segment / self.dt
                    
                    # Check for obstacles in this segment
                    min_obstacle_dist = float('inf')
                    for obs in obstacles:
                        obs_x, obs_y, obs_radius = obs
                        # Distance from obstacle to line segment
                        dist = self._point_to_line_distance(
                            np.array([obs_x, obs_y]), self.path[i], self.path[i+1])
                        min_obstacle_dist = min(min_obstacle_dist, dist - obs_radius)
                    
                    # Adjust timing based on obstacles
                    if min_obstacle_dist < 0.5:
                        # Slow down near obstacles
                        self.dt *= 1.1
                    elif min_obstacle_dist > 1.0:
                        # Speed up when clear
                        self.dt *= 0.9
                    
                    # Ensure reasonable timing bounds
                    self.dt = np.clip(self.dt, 0.05, 0.5)
    
    def _point_to_line_distance(self, point, line_start, line_end):
        """Calculate distance from point to line segment"""
        line_vec = line_end - line_start
        point_vec = point - line_start
        line_len = np.linalg.norm(line_vec)
        
        if line_len == 0:
            return np.linalg.norm(point_vec)
        
        line_unitvec = line_vec / line_len
        proj_length = np.dot(point_vec, line_unitvec)
        proj_length = np.clip(proj_length, 0, line_len)
        
        closest_point = line_start + proj_length * line_unitvec
        return np.linalg.norm(point - closest_point)
    
    def update_path(self, current_position, obstacles):
        """Update path in real-time"""
        # Update current position
        self.path[0] = current_position[:2]
        
        # Re-optimize path
        self._optimize_path_shape(obstacles)
        self._optimize_timing(obstacles)
        
        # Update times
        self.times = np.arange(self.num_points) * self.dt
    
    def get_next_waypoint(self, current_position):
        """Get next waypoint on the path"""
        # Find closest point on path
        distances = [np.linalg.norm(point - current_position[:2]) for point in self.path]
        closest_idx = np.argmin(distances)
        
        # Get next point (with some lookahead)
        next_idx = min(closest_idx + 3, len(self.path) - 1)
        return self.path[next_idx]
    
    def visualize_path(self, obstacles, current_position=None):
        """Visualize the elastic band path"""
        fig, ax = plt.subplots(1, 1, figsize=(12, 10))
        
        # Draw obstacles
        for obs in obstacles:
            obs_x, obs_y, obs_radius = obs
            obstacle = Circle((obs_x, obs_y), obs_radius, color='red', alpha=0.7)
            ax.add_patch(obstacle)
        
        # Draw path
        ax.plot(self.path[:, 0], self.path[:, 1], 'b-', linewidth=2, label='TEB Path')
        ax.plot(self.path[:, 0], self.path[:, 1], 'bo', markersize=4)
        
        # Draw current position
        if current_position is not None:
            ax.plot(current_position[0], current_position[1], 'go', markersize=10, label='Current Position')
        
        # Draw velocity vectors
        for i in range(0, len(self.path)-1, 2):
            vel = self.velocities[i]
            if np.linalg.norm(vel) > 0.01:
                ax.arrow(self.path[i, 0], self.path[i, 1], 
                        vel[0]*0.5, vel[1]*0.5,
                        head_width=0.1, head_length=0.1, fc='green', ec='green')
        
        ax.set_xlim(-1, 6)
        ax.set_ylim(-1, 6)
        ax.set_aspect('equal')
        ax.grid(True, alpha=0.3)
        ax.legend()
        ax.set_xlabel('X Position (m)')
        ax.set_ylabel('Y Position (m)')
        ax.set_title('TEB Path Visualization')
        
        plt.tight_layout()
        plt.show()

# Example TEB usage
teb = TEBPlanner(robot)

# Initialize path
start = np.array([0, 0, 0])
goal = np.array([5, 5, 0])
obstacles = [(2, 2, 0.3), (3, 4, 0.4), (4, 1, 0.3)]

teb.initialize_path(start, goal, obstacles)
teb.visualize_path(obstacles)

Real-time Obstacle Avoidance System

Let’s create a comprehensive real-time obstacle avoidance system:

class RealTimeObstacleAvoidance:
    def __init__(self, planner_type='dwa', robot=None):
        """
        Initialize real-time obstacle avoidance system
        
        Args:
            planner_type: 'dwa' or 'teb'
            robot: Robot with dynamic constraints
        """
        self.planner_type = planner_type
        self.robot = robot or SimpleRobot()
        
        if planner_type == 'dwa':
            self.planner = RobotDWA()
        elif planner_type == 'teb':
            self.planner = TEBPlanner(self.robot)
        
        # Simulation parameters
        self.dt = 0.1
        self.current_position = np.array([0.0, 0.0, 0.0])
        self.goal = np.array([5.0, 5.0, 0.0])
        self.obstacles = []
        
        # History for visualization
        self.trajectory_history = []
    
    def add_obstacle(self, x, y, radius):
        """Add a dynamic obstacle"""
        self.obstacles.append((x, y, radius))
    
    def update_obstacles(self, time):
        """Update obstacle positions (for dynamic obstacles)"""
        # Example: moving obstacles
        if time < 5:
            # Obstacle 1 moves in a circle
            x = 2 + 0.5 * np.cos(time * 0.5)
            y = 2 + 0.5 * np.sin(time * 0.5)
            self.obstacles[0] = (x, y, 0.3)
        
        if time > 3:
            # Obstacle 2 appears and moves
            x = 1 + time * 0.3
            y = 4
            if len(self.obstacles) < 2:
                self.obstacles.append((x, y, 0.4))
            else:
                self.obstacles[1] = (x, y, 0.4)
    
    def step(self, time):
        """Single simulation step"""
        # Update dynamic obstacles
        self.update_obstacles(time)
        
        # Plan motion
        if self.planner_type == 'dwa':
            v, w = self.planner.plan_motion(
                self.current_position[0], self.current_position[1], self.current_position[2],
                self.goal[0], self.goal[1], self.obstacles, self.dt
            )
            
            # Update robot position
            self.current_position[0] += v * np.cos(self.current_position[2]) * self.dt
            self.current_position[1] += v * np.sin(self.current_position[2]) * self.dt
            self.current_position[2] += w * self.dt
            
        elif self.planner_type == 'teb':
            # Update TEB path
            self.planner.update_path(self.current_position, self.obstacles)
            
            # Get next waypoint
            next_waypoint = self.planner.get_next_waypoint(self.current_position)
            
            # Simple control to move toward waypoint
            direction = next_waypoint - self.current_position[:2]
            distance = np.linalg.norm(direction)
            
            if distance > 0.1:
                v = min(1.0, distance * 2.0)  # Proportional control
                angle_to_waypoint = np.arctan2(direction[1], direction[0])
                angle_diff = angle_to_waypoint - self.current_position[2]
                
                # Normalize angle difference
                while angle_diff > np.pi:
                    angle_diff -= 2 * np.pi
                while angle_diff < -np.pi:
                    angle_diff += 2 * np.pi
                
                w = np.clip(angle_diff * 2.0, -1.0, 1.0)  # Proportional control
                
                # Update robot position
                self.current_position[0] += v * np.cos(self.current_position[2]) * self.dt
                self.current_position[1] += v * np.sin(self.current_position[2]) * self.dt
                self.current_position[2] += w * self.dt
        
        # Store trajectory
        self.trajectory_history.append(self.current_position.copy())
        
        # Check if goal reached
        goal_distance = np.linalg.norm(self.current_position[:2] - self.goal[:2])
        return goal_distance < 0.3
    
    def simulate(self, duration=20.0):
        """Run simulation"""
        time = 0
        goal_reached = False
        
        while time < duration and not goal_reached:
            goal_reached = self.step(time)
            time += self.dt
            
            if int(time * 10) % 10 == 0:  # Print every second
                print(f"Time: {time:.1f}s, Position: ({self.current_position[0]:.2f}, {self.current_position[1]:.2f}), "
                      f"Distance to goal: {np.linalg.norm(self.current_position[:2] - self.goal[:2]):.2f}")
        
        return goal_reached, time
    
    def visualize_simulation(self):
        """Visualize simulation results"""
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 8))
        
        # Plot trajectory
        if self.trajectory_history:
            trajectory = np.array(self.trajectory_history)
            ax1.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=2, label='Robot Trajectory')
            ax1.plot(trajectory[0, 0], trajectory[0, 1], 'go', markersize=10, label='Start')
            ax1.plot(trajectory[-1, 0], trajectory[-1, 1], 'ro', markersize=10, label='End')
        
        # Plot obstacles at final state
        for obs in self.obstacles:
            obs_x, obs_y, obs_radius = obs
            obstacle = Circle((obs_x, obs_y), obs_radius, color='red', alpha=0.7)
            ax1.add_patch(obstacle)
        
        ax1.plot(self.goal[0], self.goal[1], 'r*', markersize=15, label='Goal')
        ax1.set_xlim(-1, 6)
        ax1.set_ylim(-1, 6)
        ax1.set_aspect('equal')
        ax1.grid(True, alpha=0.3)
        ax1.legend()
        ax1.set_xlabel('X Position (m)')
        ax1.set_ylabel('Y Position (m)')
        ax1.set_title(f'{self.planner_type.upper()} Simulation')
        
        # Plot distance to goal over time
        if self.trajectory_history:
            distances = [np.linalg.norm(pos[:2] - self.goal[:2]) for pos in self.trajectory_history]
            times = np.arange(len(distances)) * self.dt
            ax2.plot(times, distances, 'b-', linewidth=2)
            ax2.axhline(y=0.3, color='r', linestyle='--', label='Goal Threshold')
            ax2.set_xlabel('Time (s)')
            ax2.set_ylabel('Distance to Goal (m)')
            ax2.set_title('Distance to Goal Over Time')
            ax2.grid(True, alpha=0.3)
            ax2.legend()
        
        plt.tight_layout()
        plt.show()

# Run simulation
print("Running DWA simulation...")
dwa_system = RealTimeObstacleAvoidance('dwa')
dwa_system.add_obstacle(2, 2, 0.3)
dwa_system.add_obstacle(3, 4, 0.4)
goal_reached, final_time = dwa_system.simulate()
print(f"DWA simulation completed. Goal reached: {goal_reached}, Time: {final_time:.1f}s")
dwa_system.visualize_simulation()

print("\nRunning TEB simulation...")
teb_system = RealTimeObstacleAvoidance('teb')
teb_system.add_obstacle(2, 2, 0.3)
teb_system.add_obstacle(3, 4, 0.4)
teb_system.planner.initialize_path(
    np.array([0, 0, 0]), np.array([5, 5, 0]), teb_system.obstacles)
goal_reached, final_time = teb_system.simulate()
print(f"TEB simulation completed. Goal reached: {goal_reached}, Time: {final_time:.1f}s")
teb_system.visualize_simulation()

Advanced Techniques

1. Multi-robot Coordination

class MultiRobotCoordinator:
    def __init__(self, robots, workspace_bounds):
        """
        Initialize multi-robot coordinator
        
        Args:
            robots: List of robot objects
            workspace_bounds: (min_x, max_x, min_y, max_y)
        """
        self.robots = robots
        self.bounds = workspace_bounds
        self.formation_spacing = 1.0  # Desired spacing between robots
    
    def plan_formation_path(self, formation_center, formation_type='line'):
        """
        Plan formation path for multiple robots
        
        Args:
            formation_center: Center of formation
            formation_type: 'line', 'circle', 'grid'
        
        Returns:
            List of goal positions for each robot
        """
        goals = []
        
        if formation_type == 'line':
            # Line formation
            num_robots = len(self.robots)
            for i, robot in enumerate(self.robots):
                offset = (i - num_robots/2) * self.formation_spacing
                goal = formation_center + np.array([offset, 0])
                goals.append(goal)
        
        elif formation_type == 'circle':
            # Circular formation
            num_robots = len(self.robots)
            radius = self.formation_spacing * num_robots / (2 * np.pi)
            for i, robot in enumerate(self.robots):
                angle = 2 * np.pi * i / num_robots
                goal = formation_center + radius * np.array([np.cos(angle), np.sin(angle)])
                goals.append(goal)
        
        return goals
    
    def avoid_inter_robot_collision(self, robot_idx, trajectories):
        """Avoid collisions between robots"""
        robot_trajectory = trajectories[robot_idx]
        
        for other_idx, other_trajectory in enumerate(trajectories):
            if other_idx != robot_idx:
                # Check minimum distance between trajectories
                min_distance = float('inf')
                for i, point in enumerate(robot_trajectory):
                    for j, other_point in enumerate(other_trajectory):
                        distance = np.linalg.norm(point - other_point)
                        min_distance = min(min_distance, distance)
                
                # If too close, adjust trajectory
                if min_distance < self.formation_spacing:
                    # Simple repulsion
                    repulsion = (robot_trajectory[-1] - other_trajectory[-1])
                    if np.linalg.norm(repulsion) > 0:
                        repulsion = repulsion / np.linalg.norm(repulsion)
                        robot_trajectory[-1] += repulsion * 0.1
        
        return robot_trajectory

Key Takeaways

Next Steps

Tomorrow, we’ll explore robot control algorithms and motion control. We’ll implement PID controllers, model predictive control (MPC), and other advanced control techniques for precise robot movement.

Practice Exercise: Create a comprehensive simulation environment that combines dynamic path planning with realistic robot dynamics. Add support for different sensor types (LiDAR, cameras, ultrasonic) and implement sensor-based obstacle detection and avoidance. Test your system in various scenarios including multi-robot coordination and dynamic environments.