Friday 10 June 2016

Lesson 10 - Self-driving vehicle

EV3 Direct commands - Lesson 08

Introduction

The last three lessons were about classes, which organize tasks. This lesson is another one about class TwoWheelVehicle, that represents vehicles with two drived wheels. We will make it compatible to the task concept At its end we will realize a vehicle, that follows a course and immediately stops, when its sensor detects a barrier and continues the driving, when the barrier disappeares. The whole algorithm of this self driving vehicle will be a task object.

This all will be done step by step. We train our vehicle to stop immediately and to continue correct. At the end we will construct tasks of a new level of complexity.

The task factory

We add some constants to class TwoWheelVehicle:


DRIVE_TYPE_STRAIGHT = "straight"
DRIVE_TYPE_TURN = "turn"
DRIVE_TYPE_ROTATE_TO = "rotate_to"
DRIVE_TYPE_DRIVE_TO = "drive_to"
DRIVE_TYPE_STOP = "stop"
      
We add a new method:

    def task_factory(self, drive_type: str, **kwargs) -> task.Task:
        speed = kwargs.pop('speed', None)
        distance = kwargs.pop('distance', None)
        radius_turn = kwargs.pop('radius_turn', None)
        angle = kwargs.pop('angle', None)
        right_turn = kwargs.pop('right_turn', False)
        orientation = kwargs.pop('orientation', None)
        pos_x = kwargs.pop('pos_x', None)
        pos_y = kwargs.pop('pos_y', None)
        brake = kwargs.pop('brake', False)
        exc = kwargs.pop('exc', None)
        if drive_type == DRIVE_TYPE_STRAIGHT:
            return task.Task(
                self.drive_straight,
                args=(speed, distance),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_TURN:
            return task.Task(
                self.drive_turn,
                args=(speed, radius_turn, angle, right_turn),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_ROTATE_TO:
            return task.Task(
                self.rotate_to,
                args=(speed, orientation),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_DRIVE_TO:
            return task.Task(
                self.drive_to,
                args=(speed, pos_x, pos_y),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_STOP:
            return task.Task(
                self.stop,
                args=(brake,),
                exc=exc
            )
      
We test it:

#!/usr/bin/env python3

import ev3, ev3_vehicle, task, time

vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
speed = 25

drive = task.concat(
    vehicle.task_factory("turn", speed, radius_turn=0.25, angle=60),
    vehicle.task_factory("straight", speed, distance=0.2),
    vehicle.task_factory("rotate_to", speed, orientation=0),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop", brake=False)
)
drive.start()
      
It works, the vehicle does its three moves, then stops hard (with brake). drive.start() returns immediately, which allows to start something parallel. But what happens, when we call method stop? Task objects don't know, that vehicles need an explicit call of method stop.

Stopping a vehicle

We change method task_factory:


    def task_factory(self, drive_type: str, **kwargs) -> task.Task:
        speed = kwargs.pop('speed', None)
        distance = kwargs.pop('distance', None)
        radius_turn = kwargs.pop('radius_turn', None)
        angle = kwargs.pop('angle', None)
        right_turn = kwargs.pop('right_turn', False)
        orientation = kwargs.pop('orientation', None)
        pos_x = kwargs.pop('pos_x', None)
        pos_y = kwargs.pop('pos_y', None)
        brake = kwargs.pop('brake', False)
        exc = kwargs.pop('exc', None)
        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = task.Task(
                self.drive_straight,
                args=(speed, distance),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_TURN:
            t = task.Task(
                self.drive_turn,
                args=(speed, radius_turn, angle, right_turn),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_ROTATE_TO:
            t = task.Task(
                self.rotate_to,
                args=(speed, orientation),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_DRIVE_TO:
            t = task.Task(
                self.drive_to,
                args=(speed, pos_x, pos_y),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_STOP:
            t = task.Task(
                self.stop,
                args=(brake,),
                exc=exc
            )
        return task.Task(t.start, join=True, exc=exc)
      
This makes all movements to contained tasks. They are root tasks and will stay root tasks, even when the returned task object is appended to another task object. We already discussed it in lesson 9.

We modify the test program:


#!/usr/bin/env python3

import ev3, ev3_vehicle, task, time

vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
speed = 25

drive = task.concat(
    vehicle.task_factory("turn", speed, radius_turn=0.25, angle=60),
    vehicle.task_factory("straight", speed, distance=0.2),
    vehicle.task_factory("rotate_to", speed, orientation=0),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop", brake=False)
)
drive.start()
time.sleep(1)
drive.stop()
      
It turns, then stops, this is, what we expected. The call of method stop immediately returns, but the movement lasts longer. This type of stopping is not what we target. The perfect stopping reacts prompt.

Fast stopping of drive_straight

Actually, the smallest unit of our tasks are the methods drive_straight, drive_turn, rotate_to and drive_to. If we want a fast stopping, we need to split them up into smaller units. F.i. we can split up drive_straight into a chain of tasks, which consits of a Task object and a Repeated object. This allows to stop while Repeated executes with the option to continue it. First we take a look into the actual code of method drive_straight:


    def drive_straight(self, speed: int, distance: float) -> None:
        ...
        self.move_straight(speed)
        step = round(distance * 360 / (2 * math.pi * self._radius_wheel))
        direction = math.copysign(1, speed * self._polarity)
        pos_final = [self._pos[0] + direction * step,
                     self._pos[1] + direction * step]
        while True:
            value = self._test_pos(direction, pos_final)
            if value == -1:
                break
            time.sleep(value)
      
It seems easy to realize the lower part with a Repeated object. We will code a chain of tasks, the first part will be a Task object, the second a Repeated. But we need to solve the problem with the arguments of method _test_pos. They are not known, when the Repeated task is constructed. Therefore we add a new attribute to class TwoWheelVehicle:

    def __init__(self, radius_wheel: float, tread: float,
                 protocol: str=None,
                 host: str=None,
                 ev3_obj: ev3.EV3=None):
        super().__init__(protocol=protocol,
                         host=host,
                         ev3_obj=ev3_obj)
        ...
        self._test_args = None
      

We change method _test_pos from:


    def _test_pos(
            self,
            direction,
            pos_final: list
    ) -> float:
        if self._to_stop:
        ...
      
to:

    def _test_pos(self) -> float:
        (direction, pos_final) = self._test_args
        if self._to_stop:
        ...
      
and method drive_straight to:

    def drive_straight(self, speed: int, distance: float) -> None:
        ...
        self._test_pos_args = (direction, pos_final)
        while True:
            value = self._test_pos()
            if value == -1:
                break
            time.sleep(value)
      
This solves the problem with the arguments! Now we can write a new method _drive_straight:

    def _drive_straight(self, speed: int, distance: float) -> None:
        self.move_straight(speed)
        step = round(distance * 360 / (2 * math.pi * self._radius_wheel))
        direction = math.copysign(1, speed * self._polarity)
        pos_final = [self._pos[0] + direction * step,
                     self._pos[1] + direction * step]
        self._test_pos_args = (direction, pos_final)
      
and we modify method task_factory:

    def task_factory(
            ...
    ) -> task.Task:
        ...
        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = task.Task(
                self._drive_straight,
                args=(speed, distance),
                action_stop=self.stop,
                exc=exc
            )
            if distance != None:
                t.append(task.Repeated(self._test_pos))
        ...
      
The first part, the Task object, calls method _drive_straight, which starts the movement and calculates the arguments of method _test_pos. The second part is a Repeated, that ends, when the final position is reached.

We test the new code with this program:


#!/usr/bin/env python3

import ev3, ev3_vehicle, task, time

vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
vehicle.verbosity = 1
speed = 25

drive = vehicle.task_factory("straight", speed, distance=0.5)
drive.start()
time.sleep(1)
drive.stop()
      
its output:

21:22:47.812922 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:47.860821 Recv 0x|0B:00|2A:00|02|B0:0C:00:00:C0:0C:00:00|
21:22:47.962883 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.004716 Recv 0x|0B:00|2B:00|02|CC:0C:00:00:DC:0C:00:00|
21:22:48.250600 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.300776 Recv 0x|0B:00|2C:00|02|17:0D:00:00:27:0D:00:00|
21:22:48.814450 Sent 0x|19:00|2D:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.876683 Recv 0x|0B:00|2D:00|02|AC:0D:00:00:BB:0D:00:00|
      
The stopping occured one second after the start of the movement, not when the whole distance was moved. Fast stopping is realized!

Continuing drive_straight

When we call method stop, we stop the motors. We have to restart them, when we continue. This is, what the task's argument action_cont is thought for. If you look inside method _test_pos, you will realize a second problem. This method calculates the remaining time from the time gap since its last call. Stopping and continuing will disturb this algorithm. Therefore continuation needs to act like the start of a new movement. We add a new attribute to class TwoWheelVehicle:


    def __init__(self, radius_wheel: float, tread: float,
                 protocol: str=None,
                 host: str=None,
                 ev3_obj: ev3.EV3=None):
        ...
        self._turn = None
        self._speed = None
        ...
      

We add a line to method move:


    def move(self, speed: int, turn: int) -> None:
        ...
        self._turn = turn
        self._speed = speed
        self._moves = True
      

We add the following method:


    def _vehicle_cont(self):
        self.move(self._speed, self._turn)
        self._to_stop = False
        self._last_t = None
      
Setting self._last_t = None demands the loop to act like a new movement.

We modify method task_factory:


        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = task.Task(
                self._drive_straight,
                args=(speed, distance),
                action_stop=self.stop,
                action_cont=self._vehicle_cont,
                exc=exc
            )
            if distance != None:
                t.append(task.Repeated(self._test_pos))
      
This will call method _vehicle_cont when the task continues.

We change method drive_straight to start a task and join it:


    def drive_straight(self, speed: int, distance: float) -> None:
        self.task_factory(
            DRIVE_TYPE_STRAIGHT,
            speed,
            distance=distance
        ).start().join()
      
This helps to prevent double code. We started with a factory that called method drive_straight. Now we end with the opposite situation, method drive_straight calls the factory!

We test the continuation with this program:


#!/usr/bin/env python3

import ev3, ev3_vehicle, task, time, datetime

vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
vehicle.verbosity = 1

speed = 25
drive = task.concat(
    vehicle.task_factory("straight", speed=speed, distance=0.5),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop")
)

drive.start()
time.sleep(3)
drive.stop()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "*** called stop ***")
time.sleep(2)
drive.cont()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "*** called cont ***")
      
As you can see, the program consits of three parts:
  • creating and administrating class TwoWheelVehicle,
  • constructing task drive (a chain of tasks),
  • opearating task drive (needs no knowledge of the tasks inner structure).

its output:


09:17:04.074805 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.124579 Recv 0x|0B:00|2A:00|02|78:0F:00:00:7B:0F:00:00|
09:17:04.226509 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.294573 Recv 0x|0B:00|2B:00|02|9D:0F:00:00:A0:0F:00:00|
09:17:04.566396 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.630528 Recv 0x|0B:00|2C:00|02|F4:0F:00:00:F5:0F:00:00|
09:17:05.237855 Sent 0x|15:00|2D:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:05.313547 Recv 0x|0B:00|2D:00|02|9F:10:00:00:A2:10:00:00|
09:17:06.604481 Sent 0x|15:00|2E:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:06.671509 Recv 0x|0B:00|2E:00|02|FA:11:00:00:FD:11:00:00|
09:17:07.077801 *** called stop ***
09:17:07.078564 Sent 0x|19:00|2F:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:07.286495 Recv 0x|0B:00|2F:00|02|97:12:00:00:9A:12:00:00|
09:17:09.081348 *** called cont ***
09:17:10.608654 Sent 0x|1F:00|30:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:10.667424 Recv 0x|0B:00|30:00|02|C4:12:00:00:C6:12:00:00|
09:17:10.769170 Sent 0x|15:00|31:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:10.812384 Recv 0x|0B:00|31:00|02|E4:12:00:00:E5:12:00:00|
09:17:11.058759 Sent 0x|15:00|32:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.015337 Recv 0x|0B:00|32:00|02|17:14:00:00:18:14:00:00|
09:17:12.017042 Sent 0x|19:00|33:00|00|08:00|A3:00:09:01:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.060297 Recv 0x|0B:00|33:00|02|21:14:00:00:22:14:00:00|
09:17:12.563204 Sent 0x|19:00|34:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.613220 Recv 0x|0B:00|34:00|02|21:14:00:00:22:14:00:00|
      
At a first look it works as designed! When continued, it again has small time gaps between the readings of the wheels positions, which then grow. A closer look at the times shows, that the continuation doesn't occur 2 sec. after the stopping. There is a gap of about 1.5 sec. between the call of method cont and the sending of the direct command, that continues the movement. This is worth to analyse.

Subclassing Task

We remember how Task objects are contined. Attribute _time_action holds the time of the next execution of their action. Method _cont2 determines the time of the next action of the continued task itself and all its contained tasks. When calling method _cont2 of the contained tasks it sets argument time_delta so that the old synchronization is reconstructed. This was fine for most situations but method sound of class Jukebox subclassed Task to realize the correct timing. In our situation, once more the standatrd is not what we need. Our continuation of the Repeated object is more the start of a new movement. This says, we again have to manipulate attribute _time_action to get the correct result. We do it the following way:


    def task_factory(self, drive_type: str, **kwargs) -> task.Task:
        ...
        class _Drive(task.Task):
            def stop(self):
                super().stop()
                self._time_action = time.time()

        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = _Drive(
                    self._drive_straight,
                    args=(speed, distance),
                    action_stop=self.stop,
                    action_cont=self._vehicle_cont,
                    exc=exc
            )
            if distance != None:
                t.append(task.Repeated(self._test_pos))
        ...
      
Class _Drive is a subclass of task.Task. We modify the root task and not task.Repeated because method stop always is called via the root task. The only modification of _Drive objects is that their method stop sets _time_action to the actual time. I repeated the test and got this output:

09:45:54.792138 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:54.837971 Recv 0x|0B:00|2A:00|02|21:14:00:00:22:14:00:00|
09:45:54.939915 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:54.983938 Recv 0x|0B:00|2B:00|02|40:14:00:00:40:14:00:00|
09:45:55.231559 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:55.309953 Recv 0x|0B:00|2C:00|02|8A:14:00:00:8B:14:00:00|
09:45:55.883439 Sent 0x|15:00|2D:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:55.936879 Recv 0x|0B:00|2D:00|02|35:15:00:00:36:15:00:00|
09:45:57.137283 Sent 0x|15:00|2E:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:57.186836 Recv 0x|0B:00|2E:00|02|74:16:00:00:74:16:00:00|
09:45:57.795134 *** called stop ***
09:45:57.795901 Sent 0x|19:00|2F:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:57.856792 Recv 0x|0B:00|2F:00|02|20:17:00:00:20:17:00:00|
09:45:59.798555 *** called cont ***
09:45:59.862278 Sent 0x|1F:00|30:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:59.917685 Recv 0x|0B:00|30:00|02|3E:17:00:00:3F:17:00:00|
09:46:00.019364 Sent 0x|15:00|31:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:00.396673 Recv 0x|0B:00|31:00|02|AE:17:00:00:AF:17:00:00|
09:46:00.977447 Sent 0x|15:00|32:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:01.392575 Recv 0x|0B:00|32:00|02|AF:18:00:00:AF:18:00:00|
09:46:01.635565 Sent 0x|19:00|33:00|00|08:00|A3:00:09:01:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:01.693532 Recv 0x|0B:00|33:00|02|FB:18:00:00:FC:18:00:00|
09:46:02.196212 Sent 0x|19:00|34:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:02.428546 Recv 0x|0B:00|34:00|02|FB:18:00:00:FB:18:00:00|
      
This is what I hoped to see!

Method drive_turn

I think, it's straight forward to code the changes of methods drive_turn, test_o and task_factory. At the end, it behaves like method drive_straight.

Method rotate_to

Actually method rotate_to calls method drive_turn. We have to change that. Once again we don't know the arguments, when the task is constructed. We code a new method _rotate_to:


    def _rotate_to(self, speed: int, orientation: float) -> None:
        diff = orientation - self._orientation
        diff += 180
        diff %= 360
        diff -= 180
        if diff >= 0:
            right_turn = False
            direction = 1
        else:
            right_turn = True
            direction = -1
        if abs(diff) >= 1:
            o_orig = self._orientation
            self.move_turn(speed, radius_turn=0, right_turn=right_turn)
            if diff > 0 and self._orientation > o_orig + diff:
                diff += 360
            elif diff < 0 and self._orientation < o_orig + diff:
                diff -= 360
        delta_pos = self._polarity * diff * 0.5 * self._tread / self._radius_wheel
        final_pos = [self._pos[0] - delta_pos,
                     self._pos[1] + delta_pos]
        self._test_args = (direction, o_orig + diff, final_pos)
     
You see the analogy with method _drive_straight. We start the movement and we calculate the arguments of method _test_o.

We modify method task_factory:


        ...
        elif drive_type == DRIVE_TYPE_ROTATE_TO:
            t = task.concat(
                _Drive(
                    self._rotate_to,
                    args=(speed, orientation),
                    action_stop=self.stop,
                    action_cont=self._vehicle_cont,
                    exc=exc
                ),
                task.Repeated(self._test_o)
            )
        ...
      

And we let method rotate_to call the task factory:


    def rotate_to(self, speed: int, orientation: float) -> None:
        self.task_factory(
            DRIVE_TYPE_ROTATE_TO,
            speed,
            orientation=orientation
        ).start().join()
      

Method drive_to

Method drive_to combines a turn with a straight movement. Again we don't know the arguments, which says we can't call methods rotate_to or drive_straight, but we will call _rotate_to and _drive_straight. Altogether we construct a chain of tasks with four links. We modify method task_factory:


 ...
        elif drive_type == DRIVE_TYPE_DRIVE_TO:
            t = task.concat(
                _Drive(
                    self._drive_to_1,
                    args=(speed, pos_x, pos_y),
                    action_stop=self.stop,
                    action_cont=self._vehicle_cont,
                    exc=exc,
                ),
                task.Repeated(self._test_o),
                task.Task(
                    self._drive_to_2,
                    args=(speed, pos_x, pos_y)
                ),
                task.Repeated(self._test_pos)
            )
        ...
      
Method _drive_to_1 will start the turn movement and calculate the final orientation. Method _drive_to_2 starts a straight movement and calculates the final position of the wheels.

We code the new method _drive_to_1:


    def _drive_to_1(
            self,
            speed: int,
            pos_x: float,
            pos_y: float
    ) -> None:
        diff_x = pos_x - self._pos_x
        diff_y = pos_y - self._pos_y
        dist = math.sqrt(diff_x**2 + diff_y**2)
        if abs(diff_x) > abs(diff_y):
            direct = math.degrees(math.atan(diff_y/diff_x))
        else:
            fract = diff_x / diff_y
            sign = math.copysign(1.0, fract)
            direct = sign * 90 - math.degrees(math.atan(fract))
        if diff_x < 0:
            direct += 180
        self._rotate_to(speed, direct)
      

And a new method _drive_to_2:


    def _drive_to_2(
            self,
            speed: int,
            pos_x: float,
            pos_y: float
    ) -> None:
        diff_x = pos_x - self._pos_x
        diff_y = pos_y - self._pos_y
        dist = math.sqrt(diff_x**2 + diff_y**2)
        self._drive_straight(speed, dist)
      

Again method drive_to calls the factory:


    def drive_to(
            self,
            speed: int,
            pos_x: float,
            pos_y: float
    ) -> None:
        self.task_factory(
            DRIVE_TYPE_DRIVE_TO,
            speed,
            pos_x=pos_x,
            pos_y=pos_y
        ).start().join()
      
This was it! All the movements of the vehicle stop prompt and can be continued.

Self driving vehicles

We use the Follow Me vehicle (lesson 5) with its infrared sensor. We want the vehicle to follow a course, but stop, when it detects a barrier. When the obstacle disappeares, the vehicle again follows its course (two times the shape of the digit eight). This is not, what Google or the car manufacturers mean by self driving, but it is what we can achieve with about 80 lines of code.


#!/usr/bin/env python3

import ev3, ev3_vehicle, task, struct
sensor = ev3.EV3(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=sensor)

speed = 50
sensor_gap = 0.1

def look_forward(cont: bool=False):
    ops = b''.join([
        ev3.opInput_Device,
        ev3.READY_RAW,
        ev3.LCX(0),          # LAYER
        ev3.LCX(1),          # NO
        ev3.LCX(33),         # TYPE - EV3-IR
        ev3.LCX(0),          # MODE - Proximity
        ev3.LCX(1),          # VALUES
        ev3.GVX(0)           # VALUE1
    ])
    reply = sensor.send_direct_cmd(ops, global_mem=4)
    dist = struct.unpack('<i', reply[5:])[0]
    if dist < 10 and not cont:
        return True
    elif dist >= 10 and cont:
        return True

t_drive = task.concat(
    task.Repeated(
        task.concat(
            vehicle.task_factory(
                "turn",
                speed=speed,
                radius_turn=0.25,
                angle=360
            ),
            vehicle.task_factory(
                "turn",
                speed=speed,
                radius_turn=-0.25,
                angle=360
            )
        ).start,
        num=2
    ),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop")
)
t_controller = task.concat(
    task.Task(t_drive.start),
    task.Periodic(
        sensor_gap,
        look_forward
    ),
    task.Task(t_drive.stop),
    task.Repeated(
        task.concat(
            task.Periodic(
                sensor_gap,
                look_forward,
                kwargs={"cont": True}
            ),
            task.Task(t_drive.cont, args=(0,)),
            task.Periodic(
                sensor_gap,
                look_forward
            ),
            task.Task(t_drive.stop)
        ).start
    )
)
t_drive = task.concat(
    t_drive,
    task.Task(t_controller.stop)
)
t_controller.start()
      
Again we see this checks and balances behaviour of two tasks. Task t_controller starts, stops and continues task t_drive, but it is t_drive that finally stops t_controller, when the course ends.

The interesting part is task t_controller. This happens with task t_drive: start, stop, then an unlimited number of cont and stop. It's important, that the iteration of Repeated ends with stop. Let's look at an alternative formulation:


t_controller = task.concat(
    task.Task(t_drive.start),
    task.Repeated(
        task.concat(
            task.Periodic(
                sensor_gap,
                look_forward
            ),
            task.Task(t_drive.stop),
            task.Periodic(
                sensor_gap,
                look_forward,
                kwargs={"cont": True}
            ),
            task.Task(t_drive.cont, args=(0,))
        ).start
    )
)
 
This version stops and continues once and never again. It's because the contained task t_drive.cont is joined until the driving ends and this prevents the next iteration of the surrounding Repeated.

Conclusion

The self driving vehicle demonstrates a mechanism of regulation by a controller, that reads sensor values. The task concept has its own coding patterns and it needs some experience to get familiar with it. Coding tasks is not simple, but it helps for an incremental working, where complexity grows step by step. When the construction of the tasks is done, they hide the compexity behind an API which always keeps its simple surface to the outside world.

Recursion makes stopping and continuation to impressive methods. The optional arguments action_stop and action_cont allow even more precision, but we have seen that sometimes we need subclassing.

Both, the family of tasks (Task, Repeated, Periodic and Sleep) and class TwoWheelVehicle have reached a quality, which allows to realize complex programs. I hope you have some ideas. It would be great to hear from.