Introduction
The last three lessons were about classes, which organize
      tasks. This lesson is another one about
      class TwoWheelVehicle, that represents vehicles
      with two drived wheels. We will make it compatible to the task
      concept At its end we will realize a vehicle, that follows a
      course and immediately stops, when its sensor detects a barrier
      and continues the driving, when the barrier disappeares. The
      whole algorithm of this self driving vehicle will be a task
      object.
This all will be done step by step. We train our vehicle to stop immediately and to continue correct. At the end we will construct tasks of a new level of complexity.
The task factory
We add some constants to class TwoWheelVehicle:
      
DRIVE_TYPE_STRAIGHT = "straight"
DRIVE_TYPE_TURN = "turn"
DRIVE_TYPE_ROTATE_TO = "rotate_to"
DRIVE_TYPE_DRIVE_TO = "drive_to"
DRIVE_TYPE_STOP = "stop"
      
    def task_factory(self, drive_type: str, **kwargs) -> task.Task:
        speed = kwargs.pop('speed', None)
        distance = kwargs.pop('distance', None)
        radius_turn = kwargs.pop('radius_turn', None)
        angle = kwargs.pop('angle', None)
        right_turn = kwargs.pop('right_turn', False)
        orientation = kwargs.pop('orientation', None)
        pos_x = kwargs.pop('pos_x', None)
        pos_y = kwargs.pop('pos_y', None)
        brake = kwargs.pop('brake', False)
        exc = kwargs.pop('exc', None)
        if drive_type == DRIVE_TYPE_STRAIGHT:
            return task.Task(
                self.drive_straight,
                args=(speed, distance),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_TURN:
            return task.Task(
                self.drive_turn,
                args=(speed, radius_turn, angle, right_turn),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_ROTATE_TO:
            return task.Task(
                self.rotate_to,
                args=(speed, orientation),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_DRIVE_TO:
            return task.Task(
                self.drive_to,
                args=(speed, pos_x, pos_y),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_STOP:
            return task.Task(
                self.stop,
                args=(brake,),
                exc=exc
            )
      
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, time
vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
speed = 25
drive = task.concat(
    vehicle.task_factory("turn", speed, radius_turn=0.25, angle=60),
    vehicle.task_factory("straight", speed, distance=0.2),
    vehicle.task_factory("rotate_to", speed, orientation=0),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop", brake=False)
)
drive.start()
      drive.start() returns immediately, which allows to start something
      parallel. But what happens, when we call method stop? Task objects don't
      know, that vehicles need an explicit call of method stop.
    Stopping a vehicle
We change method task_factory:
      
    def task_factory(self, drive_type: str, **kwargs) -> task.Task:
        speed = kwargs.pop('speed', None)
        distance = kwargs.pop('distance', None)
        radius_turn = kwargs.pop('radius_turn', None)
        angle = kwargs.pop('angle', None)
        right_turn = kwargs.pop('right_turn', False)
        orientation = kwargs.pop('orientation', None)
        pos_x = kwargs.pop('pos_x', None)
        pos_y = kwargs.pop('pos_y', None)
        brake = kwargs.pop('brake', False)
        exc = kwargs.pop('exc', None)
        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = task.Task(
                self.drive_straight,
                args=(speed, distance),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_TURN:
            t = task.Task(
                self.drive_turn,
                args=(speed, radius_turn, angle, right_turn),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_ROTATE_TO:
            t = task.Task(
                self.rotate_to,
                args=(speed, orientation),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_DRIVE_TO:
            t = task.Task(
                self.drive_to,
                args=(speed, pos_x, pos_y),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_STOP:
            t = task.Task(
                self.stop,
                args=(brake,),
                exc=exc
            )
        return task.Task(t.start, join=True, exc=exc)
      We modify the test program:
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, time
vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
speed = 25
drive = task.concat(
    vehicle.task_factory("turn", speed, radius_turn=0.25, angle=60),
    vehicle.task_factory("straight", speed, distance=0.2),
    vehicle.task_factory("rotate_to", speed, orientation=0),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop", brake=False)
)
drive.start()
time.sleep(1)
drive.stop()
      stop immediately returns,
      but the movement lasts longer. This type of stopping is not what we target.
      The perfect stopping reacts prompt.
    
    Fast stopping of drive_straight
Actually, the smallest unit of our tasks are the
      methods drive_straight, drive_turn, rotate_to
      and drive_to. If we want a fast stopping, we
      need to split them up into smaller
      units. F.i. we can split up drive_straight into a
      chain of tasks, which consits of a Task object and
      a Repeated object. This allows to stop while
      Repeated executes with the option to continue it. First
      we take a look into the actual code of method drive_straight:
      
    def drive_straight(self, speed: int, distance: float) -> None:
        ...
        self.move_straight(speed)
        step = round(distance * 360 / (2 * math.pi * self._radius_wheel))
        direction = math.copysign(1, speed * self._polarity)
        pos_final = [self._pos[0] + direction * step,
                     self._pos[1] + direction * step]
        while True:
            value = self._test_pos(direction, pos_final)
            if value == -1:
                break
            time.sleep(value)
      Repeated object.  We will code a chain of tasks, the first
      part will be a Task object, the second
      a Repeated. But we need to solve the problem with the
      arguments of method _test_pos. They are not known,
      when the
      Repeated task is constructed. Therefore we add a
      new attribute to class TwoWheelVehicle:
      
    def __init__(self, radius_wheel: float, tread: float,
                 protocol: str=None,
                 host: str=None,
                 ev3_obj: ev3.EV3=None):
        super().__init__(protocol=protocol,
                         host=host,
                         ev3_obj=ev3_obj)
        ...
        self._test_args = None
      We change method _test_pos from:
      
    def _test_pos(
            self,
            direction,
            pos_final: list
    ) -> float:
        if self._to_stop:
        ...
      
    def _test_pos(self) -> float:
        (direction, pos_final) = self._test_args
        if self._to_stop:
        ...
      drive_straight to:
      
    def drive_straight(self, speed: int, distance: float) -> None:
        ...
        self._test_pos_args = (direction, pos_final)
        while True:
            value = self._test_pos()
            if value == -1:
                break
            time.sleep(value)
      _drive_straight:
      
    def _drive_straight(self, speed: int, distance: float) -> None:
        self.move_straight(speed)
        step = round(distance * 360 / (2 * math.pi * self._radius_wheel))
        direction = math.copysign(1, speed * self._polarity)
        pos_final = [self._pos[0] + direction * step,
                     self._pos[1] + direction * step]
        self._test_pos_args = (direction, pos_final)
      task_factory:
      
    def task_factory(
            ...
    ) -> task.Task:
        ...
        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = task.Task(
                self._drive_straight,
                args=(speed, distance),
                action_stop=self.stop,
                exc=exc
            )
            if distance != None:
                t.append(task.Repeated(self._test_pos))
        ...
      Task object, calls method _drive_straight, which
      starts the movement and calculates the arguments of method _test_pos.
      The second part is a Repeated, that ends, when the final position
      is reached.
    
    We test the new code with this program:
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, time
vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
vehicle.verbosity = 1
speed = 25
drive = vehicle.task_factory("straight", speed, distance=0.5)
drive.start()
time.sleep(1)
drive.stop()
      
21:22:47.812922 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:47.860821 Recv 0x|0B:00|2A:00|02|B0:0C:00:00:C0:0C:00:00|
21:22:47.962883 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.004716 Recv 0x|0B:00|2B:00|02|CC:0C:00:00:DC:0C:00:00|
21:22:48.250600 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.300776 Recv 0x|0B:00|2C:00|02|17:0D:00:00:27:0D:00:00|
21:22:48.814450 Sent 0x|19:00|2D:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.876683 Recv 0x|0B:00|2D:00|02|AC:0D:00:00:BB:0D:00:00|
      Continuing drive_straight
When we call method stop, we stop the motors. We
      have to restart them, when we continue. This is, what the task's
      argument action_cont is thought for. If you look
      inside method _test_pos, you will realize a second
      problem. This method calculates the remaining time from the time
      gap since its last call.  Stopping and continuing will disturb
      this algorithm. Therefore continuation needs to act like the
      start of a new movement.  We add a new attribute to
      class TwoWheelVehicle:
      
    def __init__(self, radius_wheel: float, tread: float,
                 protocol: str=None,
                 host: str=None,
                 ev3_obj: ev3.EV3=None):
        ...
        self._turn = None
        self._speed = None
        ...
      We add a line to method move:
      
    def move(self, speed: int, turn: int) -> None:
        ...
        self._turn = turn
        self._speed = speed
        self._moves = True
      We add the following method:
    def _vehicle_cont(self):
        self.move(self._speed, self._turn)
        self._to_stop = False
        self._last_t = None
      self._last_t = None demands the loop to act like a new movement.
    
    We modify method task_factory:
      
        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = task.Task(
                self._drive_straight,
                args=(speed, distance),
                action_stop=self.stop,
                action_cont=self._vehicle_cont,
                exc=exc
            )
            if distance != None:
                t.append(task.Repeated(self._test_pos))
      _vehicle_cont when the task continues.
    
    We change method drive_straight to start a task
      and join it:
      
    def drive_straight(self, speed: int, distance: float) -> None:
        self.task_factory(
            DRIVE_TYPE_STRAIGHT,
            speed,
            distance=distance
        ).start().join()
      drive_straight.
      Now we end with the opposite situation, method drive_straight calls the factory!
    
    We test the continuation with this program:
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, time, datetime
vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
vehicle.verbosity = 1
speed = 25
drive = task.concat(
    vehicle.task_factory("straight", speed=speed, distance=0.5),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop")
)
drive.start()
time.sleep(3)
drive.stop()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "*** called stop ***")
time.sleep(2)
drive.cont()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "*** called cont ***")
      - creating and administrating class TwoWheelVehicle,
- constructing task drive(a chain of tasks),
- opearating task drive(needs no knowledge of the tasks inner structure).
its output:
09:17:04.074805 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.124579 Recv 0x|0B:00|2A:00|02|78:0F:00:00:7B:0F:00:00|
09:17:04.226509 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.294573 Recv 0x|0B:00|2B:00|02|9D:0F:00:00:A0:0F:00:00|
09:17:04.566396 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.630528 Recv 0x|0B:00|2C:00|02|F4:0F:00:00:F5:0F:00:00|
09:17:05.237855 Sent 0x|15:00|2D:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:05.313547 Recv 0x|0B:00|2D:00|02|9F:10:00:00:A2:10:00:00|
09:17:06.604481 Sent 0x|15:00|2E:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:06.671509 Recv 0x|0B:00|2E:00|02|FA:11:00:00:FD:11:00:00|
09:17:07.077801 *** called stop ***
09:17:07.078564 Sent 0x|19:00|2F:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:07.286495 Recv 0x|0B:00|2F:00|02|97:12:00:00:9A:12:00:00|
09:17:09.081348 *** called cont ***
09:17:10.608654 Sent 0x|1F:00|30:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:10.667424 Recv 0x|0B:00|30:00|02|C4:12:00:00:C6:12:00:00|
09:17:10.769170 Sent 0x|15:00|31:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:10.812384 Recv 0x|0B:00|31:00|02|E4:12:00:00:E5:12:00:00|
09:17:11.058759 Sent 0x|15:00|32:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.015337 Recv 0x|0B:00|32:00|02|17:14:00:00:18:14:00:00|
09:17:12.017042 Sent 0x|19:00|33:00|00|08:00|A3:00:09:01:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.060297 Recv 0x|0B:00|33:00|02|21:14:00:00:22:14:00:00|
09:17:12.563204 Sent 0x|19:00|34:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.613220 Recv 0x|0B:00|34:00|02|21:14:00:00:22:14:00:00|
      cont and the sending of the direct command, that continues the movement. This is worth to analyse.
    
    Subclassing Task
We remember how Task objects are
      contined. Attribute _time_action holds the time of
      the next execution of their action. Method _cont2
      determines the time of the next action of the continued task
      itself and all its contained tasks. When calling
      method _cont2 of the contained tasks it sets
      argument time_delta so that the old synchronization
      is reconstructed. This was fine for most situations but
      method sound of class Jukebox
      subclassed Task to realize the correct timing. In
      our situation, once more the standatrd is not what we need. Our
      continuation of the Repeated object is more the
      start of a new movement. This says, we again have to manipulate
      attribute _time_action to get the correct
      result. We do it the following way:
      
    def task_factory(self, drive_type: str, **kwargs) -> task.Task:
        ...
        class _Drive(task.Task):
            def stop(self):
                super().stop()
                self._time_action = time.time()
        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = _Drive(
                    self._drive_straight,
                    args=(speed, distance),
                    action_stop=self.stop,
                    action_cont=self._vehicle_cont,
                    exc=exc
            )
            if distance != None:
                t.append(task.Repeated(self._test_pos))
        ...
      _Drive is a subclass
      of task.Task. We modify the root task and
      not task.Repeated because method stop
      always is called via the root task. The only modification
      of _Drive objects is that their
      method stop sets _time_action to the
      actual time. I repeated the test and got this output:
      
09:45:54.792138 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:54.837971 Recv 0x|0B:00|2A:00|02|21:14:00:00:22:14:00:00|
09:45:54.939915 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:54.983938 Recv 0x|0B:00|2B:00|02|40:14:00:00:40:14:00:00|
09:45:55.231559 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:55.309953 Recv 0x|0B:00|2C:00|02|8A:14:00:00:8B:14:00:00|
09:45:55.883439 Sent 0x|15:00|2D:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:55.936879 Recv 0x|0B:00|2D:00|02|35:15:00:00:36:15:00:00|
09:45:57.137283 Sent 0x|15:00|2E:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:57.186836 Recv 0x|0B:00|2E:00|02|74:16:00:00:74:16:00:00|
09:45:57.795134 *** called stop ***
09:45:57.795901 Sent 0x|19:00|2F:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:57.856792 Recv 0x|0B:00|2F:00|02|20:17:00:00:20:17:00:00|
09:45:59.798555 *** called cont ***
09:45:59.862278 Sent 0x|1F:00|30:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:59.917685 Recv 0x|0B:00|30:00|02|3E:17:00:00:3F:17:00:00|
09:46:00.019364 Sent 0x|15:00|31:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:00.396673 Recv 0x|0B:00|31:00|02|AE:17:00:00:AF:17:00:00|
09:46:00.977447 Sent 0x|15:00|32:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:01.392575 Recv 0x|0B:00|32:00|02|AF:18:00:00:AF:18:00:00|
09:46:01.635565 Sent 0x|19:00|33:00|00|08:00|A3:00:09:01:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:01.693532 Recv 0x|0B:00|33:00|02|FB:18:00:00:FC:18:00:00|
09:46:02.196212 Sent 0x|19:00|34:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:02.428546 Recv 0x|0B:00|34:00|02|FB:18:00:00:FB:18:00:00|
      Method drive_turn
I think, it's straight forward to code the changes of methods drive_turn, test_o and
      task_factory. At the end, it behaves like method drive_straight.
Method rotate_to
Actually method rotate_to calls
      method drive_turn. We have to change that. Once again
      we don't know the arguments, when the task is constructed.
      We code a new method _rotate_to:
      
    def _rotate_to(self, speed: int, orientation: float) -> None:
        diff = orientation - self._orientation
        diff += 180
        diff %= 360
        diff -= 180
        if diff >= 0:
            right_turn = False
            direction = 1
        else:
            right_turn = True
            direction = -1
        if abs(diff) >= 1:
            o_orig = self._orientation
            self.move_turn(speed, radius_turn=0, right_turn=right_turn)
            if diff > 0 and self._orientation > o_orig + diff:
                diff += 360
            elif diff < 0 and self._orientation < o_orig + diff:
                diff -= 360
        delta_pos = self._polarity * diff * 0.5 * self._tread / self._radius_wheel
        final_pos = [self._pos[0] - delta_pos,
                     self._pos[1] + delta_pos]
        self._test_args = (direction, o_orig + diff, final_pos)
     _drive_straight. We start
      the movement and we calculate the arguments of method _test_o.
    
    We modify method task_factory:
      
        ...
        elif drive_type == DRIVE_TYPE_ROTATE_TO:
            t = task.concat(
                _Drive(
                    self._rotate_to,
                    args=(speed, orientation),
                    action_stop=self.stop,
                    action_cont=self._vehicle_cont,
                    exc=exc
                ),
                task.Repeated(self._test_o)
            )
        ...
      And we let method rotate_to call the task factory:
      
    def rotate_to(self, speed: int, orientation: float) -> None:
        self.task_factory(
            DRIVE_TYPE_ROTATE_TO,
            speed,
            orientation=orientation
        ).start().join()
      Method drive_to
Method drive_to combines a turn with a straight movement.
      Again we don't know the arguments, which says we can't call methods rotate_to
      or drive_straight, but we will call _rotate_to and _drive_straight.
      Altogether we construct a chain of tasks with four links.
      We modify method task_factory:
      
 ...
        elif drive_type == DRIVE_TYPE_DRIVE_TO:
            t = task.concat(
                _Drive(
                    self._drive_to_1,
                    args=(speed, pos_x, pos_y),
                    action_stop=self.stop,
                    action_cont=self._vehicle_cont,
                    exc=exc,
                ),
                task.Repeated(self._test_o),
                task.Task(
                    self._drive_to_2,
                    args=(speed, pos_x, pos_y)
                ),
                task.Repeated(self._test_pos)
            )
        ...
      _drive_to_1 will start the turn movement and
      calculate the final orientation. Method _drive_to_2
      starts a straight movement and calculates the final position of the wheels.
    
    We code the new method _drive_to_1:
      
    def _drive_to_1(
            self,
            speed: int,
            pos_x: float,
            pos_y: float
    ) -> None:
        diff_x = pos_x - self._pos_x
        diff_y = pos_y - self._pos_y
        dist = math.sqrt(diff_x**2 + diff_y**2)
        if abs(diff_x) > abs(diff_y):
            direct = math.degrees(math.atan(diff_y/diff_x))
        else:
            fract = diff_x / diff_y
            sign = math.copysign(1.0, fract)
            direct = sign * 90 - math.degrees(math.atan(fract))
        if diff_x < 0:
            direct += 180
        self._rotate_to(speed, direct)
      And a new method _drive_to_2:
      
    def _drive_to_2(
            self,
            speed: int,
            pos_x: float,
            pos_y: float
    ) -> None:
        diff_x = pos_x - self._pos_x
        diff_y = pos_y - self._pos_y
        dist = math.sqrt(diff_x**2 + diff_y**2)
        self._drive_straight(speed, dist)
      Again method drive_to calls the factory:
      
    def drive_to(
            self,
            speed: int,
            pos_x: float,
            pos_y: float
    ) -> None:
        self.task_factory(
            DRIVE_TYPE_DRIVE_TO,
            speed,
            pos_x=pos_x,
            pos_y=pos_y
        ).start().join()
      Self driving vehicles
We use the Follow Me vehicle (lesson 5) with its infrared sensor. We want the vehicle to follow a course, but stop, when it detects a barrier. When the obstacle disappeares, the vehicle again follows its course (two times the shape of the digit eight). This is not, what Google or the car manufacturers mean by self driving, but it is what we can achieve with about 80 lines of code.
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, struct
sensor = ev3.EV3(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=sensor)
speed = 50
sensor_gap = 0.1
def look_forward(cont: bool=False):
    ops = b''.join([
        ev3.opInput_Device,
        ev3.READY_RAW,
        ev3.LCX(0),          # LAYER
        ev3.LCX(1),          # NO
        ev3.LCX(33),         # TYPE - EV3-IR
        ev3.LCX(0),          # MODE - Proximity
        ev3.LCX(1),          # VALUES
        ev3.GVX(0)           # VALUE1
    ])
    reply = sensor.send_direct_cmd(ops, global_mem=4)
    dist = struct.unpack('<i', reply[5:])[0]
    if dist < 10 and not cont:
        return True
    elif dist >= 10 and cont:
        return True
t_drive = task.concat(
    task.Repeated(
        task.concat(
            vehicle.task_factory(
                "turn",
                speed=speed,
                radius_turn=0.25,
                angle=360
            ),
            vehicle.task_factory(
                "turn",
                speed=speed,
                radius_turn=-0.25,
                angle=360
            )
        ).start,
        num=2
    ),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop")
)
t_controller = task.concat(
    task.Task(t_drive.start),
    task.Periodic(
        sensor_gap,
        look_forward
    ),
    task.Task(t_drive.stop),
    task.Repeated(
        task.concat(
            task.Periodic(
                sensor_gap,
                look_forward,
                kwargs={"cont": True}
            ),
            task.Task(t_drive.cont, args=(0,)),
            task.Periodic(
                sensor_gap,
                look_forward
            ),
            task.Task(t_drive.stop)
        ).start
    )
)
t_drive = task.concat(
    t_drive,
    task.Task(t_controller.stop)
)
t_controller.start()
      t_controller starts, stops and
      continues task t_drive, but it
      is t_drive that finally
      stops t_controller, when the course ends.
    The interesting part is task t_controller. This
      happens with
      task t_drive: start, stop,
      then an unlimited number of cont
      and stop.  It's important, that the iteration
      of Repeated ends with stop. Let's look
      at an alternative formulation:
      
t_controller = task.concat(
    task.Task(t_drive.start),
    task.Repeated(
        task.concat(
            task.Periodic(
                sensor_gap,
                look_forward
            ),
            task.Task(t_drive.stop),
            task.Periodic(
                sensor_gap,
                look_forward,
                kwargs={"cont": True}
            ),
            task.Task(t_drive.cont, args=(0,))
        ).start
    )
)
 t_drive.cont is
 joined until the driving ends and this prevents the next iteration of the surrounding Repeated.
    
    
    Conclusion
The self driving vehicle demonstrates a mechanism of regulation by a controller, that reads sensor values. The task concept has its own coding patterns and it needs some experience to get familiar with it. Coding tasks is not simple, but it helps for an incremental working, where complexity grows step by step. When the construction of the tasks is done, they hide the compexity behind an API which always keeps its simple surface to the outside world.
Recursion makes stopping and continuation to impressive
      methods. The optional arguments action_stop
      and action_cont allow even more precision, but
      we have seen that sometimes we need subclassing.
Both, the family of tasks
      (Task, Repeated, Periodic
      and Sleep) and class TwoWheelVehicle
      have reached a quality, which allows to realize complex
      programs. I hope you have some ideas. It would be great to hear
      from.
 
No comments:
Post a Comment