Lab 12: Path Planning and Execution¶


In [ ]:
%load_ext autoreload
%autoreload 2

import traceback
from notebook_utils import *
from Traj import *
import asyncio
import pathlib
import os
from utils import load_config_params
from localization_extras import Localization

# The imports below will only work if you copied the required ble-related python files 
# into the notebooks directory
from ble import get_ble_controller
from base_ble import LOG
from cmd_types import CMD
import numpy as np
from ble_rx_stream import *

# Setup Logger
LOG = get_logger('final_notebook.log')
LOG.propagate = False

# Init GUI and Commander
gui = GET_GUI()
cmdr = gui.launcher.commander
In [ ]:
# Start the plotter
START_PLOTTER()

The RealRobot class¶

Define the RealRobot class in the code cell below, based on the documentation and your real robot communication protocol.
This class is used by the Localization class to communicate with the real robot.
More specifically, the Localization class utilizes the RealRobot's member function perform_observation_loop() to get the 18 sensor readings and store them in its member variable obs_range_data, which is then utilized in the update step.

In [ ]:
class RealRobot():
    """A class to interact with the real robot
    """
    def __init__(self, commander, ble):
        # Load world config
        self.world_config = os.path.join(str(pathlib.Path(os.getcwd()).parent), "config", "world.yaml")
        
        self.config_params = load_config_params(self.world_config)
        
        # Commander to commuincate with the Plotter process
        # Used by the Localization module to plot odom and belief
        self.cmdr = commander

        # ArtemisBLEController to communicate with the Robot
        self.ble = ble
    
    def set_pose(self, pose):
        self.current_gt = pose

    def get_pose(self):
        """Get robot pose based on odometry
        
        Returns:
            current_odom -- Odometry Pose (meters, meters, degrees)
            current_gt   -- Ground Truth Pose (meters, meters, degrees)
        """
        return np.array([]), np.array(self.current_gt)

    async def perform_observation_loop(self, rot_vel=120):
        """Perform the observation loop behavior on the real robot, where the robot does  
        a 360 degree turn in place while collecting equidistant (in the angular space) sensor
        readings, with the first sensor reading taken at the robot's current heading. 
        The number of sensor readings depends on "observations_count"(=18) defined in world.yaml.
        
        Keyword arguments:
            rot_vel -- (Optional) Angular Velocity for loop (degrees/second)
                        Do not remove this parameter from the function definition, even if you don't use it.
        Returns:
            sensor_ranges   -- A column numpy array of the range values (meters)
            sensor_bearings -- A column numpy array of the bearings at which the sensor readings were taken (degrees)
                               The bearing values are not used in the Localization module, so you may return a empty numpy array
        """
        
        calibrate_s = 0.0
        max_angle = 360.0
        delta_angle = 20.0
        tolerance = 1.0
        min_pid = tolerance
        max_pid = delta_angle + 2 * tolerance
        min_motor = 100
        max_motor = 140
        k_p = -1.0
        k_i = -0.5
        k_d = 0.0
        self.ble.send_command(CMD.JUST_MAP, f"{int(calibrate_s*1000)}|{max_angle}|{delta_angle}|{tolerance}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")
        
        await asyncio.sleep(30)
        
        fut = ble_rx_stream(ble, 'RX_STREAM')
        self.ble.send_command(CMD.DATA_MAP, "")
        stream_map = list(unpack_stream('LfH*', await fut))
        
        map_data = np.array(stream_map)
        # map_time = map_data[:, 0] / 1000
        # map_angl = map_data[:, 1]
        map_dist = map_data[:, 2] / 1000
        
        return map_dist[::-1][np.newaxis].T, np.array([])
In [ ]:
# Initialize RealRobot with a Commander object to communicate with the plotter process
# and the ArtemisBLEController object to communicate with the real robot
robot = RealRobot(cmdr, ble)

# Initialize mapper
# Requires a VirtualRobot object as a parameter
mapper = Mapper(robot)

# Initialize your BaseLocalization object
# Requires a RealRobot object and a Mapper object as parameters
loc = Localization(robot, mapper)

## Plot Map
cmdr.plot_map()
In [ ]:
waypoints = np.array([
    (-4, -3, 0),
    (-2, -1, 0),
    ( 1, -1, 0),
    ( 2, -3, 0),
    ( 5, -3, 0),
    ( 5, -2, 0),
    ( 5,  3, 0),
    ( 0,  3, 0),
    ( 0,  0, 0),
]) * 0.3048
# at each waypoint, point to the next one
waypoints[:-1,2] = np.degrees(np.arctan2(waypoints[1:,1] - waypoints[:-1,1], waypoints[1:,0] - waypoints[:-1,0]))
# last waypoint doesn't need to turn
waypoints[-1,2] = waypoints[-2,2]
In [ ]:
async def execute_and_observe(delta_rot_1, delta_trans, delta_rot_2):
    # First rotation
    calibrate_s = 0.0
    delta_angle = delta_rot_1
    tolerance = 1.0
    min_pid = tolerance
    max_pid = delta_angle + 2 * tolerance
    min_motor = 120
    max_motor = 160
    k_p = -1.0
    k_i = -0.5
    k_d = 0.0
    ble.send_command(CMD.ROTATE, f"{int(calibrate_s*1000)}|{delta_angle}|{tolerance}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")
    
    # Translation
    delta_distance_m = delta_trans
    tolerance_mm = 50
    min_pid = tolerance_mm
    max_pid = delta_distance_m + 2 * tolerance_mm
    min_motor = 60
    max_motor = 100
    k_p = 1.0
    k_i = 0.0
    k_d = 0.0
    ble.send_command(CMD.DRIVE, f"{int(delta_distance_m*1000)}|{int(tolerance_mm)}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")
    
    # Second rotation
    calibrate_s = 0.0
    delta_angle = delta_rot_2
    tolerance = 1.0
    min_pid = tolerance
    max_pid = delta_angle + 2 * tolerance
    min_motor = 120
    max_motor = 160
    k_p = -1.0
    k_i = -0.5
    k_d = 0.0
    ble.send_command(CMD.ROTATE, f"{int(calibrate_s*1000)}|{delta_angle}|{tolerance}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")
    
    # Get Observation Data by executing a 360 degree rotation motion
    await loc.get_observation_data()
In [ ]:
# Reset Plots
cmdr.reset_plotter()

# Calibrate gyroscope
calibrate_s = 1.5
ble.send_command(CMD.CALIBRATE, f"{int(calibrate_s*1000)}")

# Init Point Mass Belief at start point
current_gt = np.array([waypoints[0,0], waypoints[0,1], 0])
loc.init_point_mass_distribution(*current_gt)
robot.set_pose(current_gt)

# Get Observation Data by executing a 360 degree rotation motion
await loc.get_observation_data()

# Run Update Step
loc.update_step()
pos_error = loc.print_update_stats(plot_data=False)

# Plot Belief and GT (i.e. waypoint)
current_bel = current_gt - pos_error
cmdr.plot_gt(current_gt[0], current_gt[1])
cmdr.plot_bel(current_bel[0], current_bel[1])
In [ ]:
# Run through each motion steps
for t in range(1, 9):
    print("\n\n-----------------", t, "-----------------")
    
    current_gt = waypoints[t,:]
    robot.set_pose(current_gt)
    
    # Create a turn-go-turn plan, Move the robot along the plan, and get Observation data
    plan = loc.compute_control(current_gt, current_bel)
    observation_fut = asyncio.create_task(execute_and_observe(*plan))
    await asyncio.sleep(0) # allow scheduler to run
    
    # Prediction Step
    loc.prediction_step(current_gt, current_bel)
    loc.print_prediction_stats(plot_data=False)
    
    # Update Step
    await observation_fut
    loc.update_step()
    pos_error = loc.print_update_stats(plot_data=False)
    current_bel = current_gt - pos_error
    cmdr.plot_gt(current_gt[0], current_gt[1])
    cmdr.plot_bel(current_bel[0], current_bel[1])

# Uncomment the below line to wait for keyboard input between each iteration.
#   input("Press Enter to Continue")
        
    print("-------------------------------------")

Just PID Control¶

In [ ]:
# Calibrate gyroscope
calibrate_s = 1.5
ble.send_command(CMD.CALIBRATE, f"{int(calibrate_s*1000)}")

# Init Point Mass Belief at start point
current_gt = np.array([waypoints[0,0], waypoints[0,1], 0])
robot.set_pose(current_gt)

current_bel = current_gt
In [ ]:
# Run through each motion steps
for t in range(1, 9):
    print("\n\n-----------------", t, "-----------------")
    
    current_gt = waypoints[t,:]
    robot.set_pose(current_gt)
    
    # Create a turn-go-turn plan, Move the robot along the plan, and get Observation data
    delta_rot_1, delta_trans, delta_rot_2 = loc.compute_control(current_gt, current_bel)
    
    # First rotation
    calibrate_s = 0.0
    delta_angle = delta_rot_1
    tolerance = 1.0
    min_pid = tolerance
    max_pid = delta_angle + 2 * tolerance
    min_motor = 100
    max_motor = 140
    k_p = 1.0
    k_i = 0.5
    k_d = 0.0
    ble.send_command(CMD.ROTATE, f"{int(calibrate_s*1000)}|{delta_angle}|{tolerance}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")
    
    # Translation
    delta_distance_m = delta_trans
    tolerance_mm = 50
    min_pid = tolerance_mm
    max_pid = delta_distance_m * 1000 + 2 * tolerance_mm
    min_motor = 80
    max_motor = 100
    k_p = 1.0
    k_i = 0.0
    k_d = 0.0
    ble.send_command(CMD.DRIVE, f"{int(delta_distance_m*1000)}|{int(tolerance_mm)}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")
    
    # Second rotation
    calibrate_s = 0.0
    delta_angle = delta_rot_2
    tolerance = 1.0
    min_pid = tolerance
    max_pid = delta_angle + 2 * tolerance
    min_motor = 100
    max_motor = 140
    k_p = 1.0
    k_i = 0.5
    k_d = 0.0
    ble.send_command(CMD.ROTATE, f"{int(calibrate_s*1000)}|{delta_angle}|{tolerance}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")
    
    current_bel = current_gt

# Uncomment the below line to wait for keyboard input between each iteration.
    input("Press Enter to Continue")
        
    print("-------------------------------------")

ble.write(ble.uuid['TX_PWM_LONG'], (0).to_bytes(4, 'little'))

Step-by-Step¶

In [ ]:
calibrate_s = 1.0
delta_angle = 45
tolerance = 1.0
min_pid = tolerance
max_pid = delta_angle + 2 * tolerance
min_motor = 100
max_motor = 140
k_p = 1.0
k_i = 0.5
k_d = 0.0
ble.send_command(CMD.ROTATE, f"{int(calibrate_s*1000)}|{delta_angle}|{tolerance}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")
In [ ]:
delta_distance_m = 0.6
tolerance_mm = 50
min_pid = tolerance_mm
max_pid = delta_distance_m * 1000 + 2 * tolerance_mm
min_motor = 80
max_motor = 100
k_p = 1.0
k_i = 0.0
k_d = 0.0
ble.send_command(CMD.DRIVE, f"{int(delta_distance_m*1000)}|{int(tolerance_mm)}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")
In [ ]:
calibrate_s = 1.0
delta_angle = -45
tolerance = 1.0
min_pid = tolerance
max_pid = delta_angle + 2 * tolerance
min_motor = 100
max_motor = 140
k_p = 1.0
k_i = 0.5
k_d = 0.0
ble.send_command(CMD.ROTATE, f"{int(calibrate_s*1000)}|{delta_angle}|{tolerance}|{min_pid}|{max_pid}|{min_motor}|{max_motor}|{k_p}|{k_i}|{k_d}")

Open Loop Control¶

In [ ]:
def send_motor(ble, fwd_1=0, bck_1=0, fwd_2=0, bck_2=0):
    return ble.write(ble.uuid['TX_PWM_LONG'], bytearray([fwd_1, bck_1, fwd_2, bck_2]))

def go_fwd(power, delay):
    send_motor(ble, power, 0, power, 0)
    time.sleep(delay)
    send_motor(ble, 0, 0, 0, 0)
def go_bck(power, delay):
    send_motor(ble, 0, power, 0, power)
    time.sleep(delay)
    send_motor(ble, 0, 0, 0, 0)
def turn_r(power, delay):
    send_motor(ble, 0, power, power, 0)
    time.sleep(delay)
    send_motor(ble, 0, 0, 0, 0)
def turn_l(power, delay):
    send_motor(ble, power, 0, 0, power)
    time.sleep(delay)
    send_motor(ble, 0, 0, 0, 0)
In [ ]:
go_fwd(80, 0.3)
In [ ]:
turn_l(130, 0.2)
In [ ]:
turn_r(130, 0.2)