Introduction
The last three lessons were about classes, which organize
tasks. This lesson is another one about
class TwoWheelVehicle
, that represents vehicles
with two drived wheels. We will make it compatible to the task
concept At its end we will realize a vehicle, that follows a
course and immediately stops, when its sensor detects a barrier
and continues the driving, when the barrier disappeares. The
whole algorithm of this self driving vehicle will be a task
object.
This all will be done step by step. We train our vehicle to stop immediately and to continue correct. At the end we will construct tasks of a new level of complexity.
The task factory
We add some constants to class TwoWheelVehicle
:
DRIVE_TYPE_STRAIGHT = "straight"
DRIVE_TYPE_TURN = "turn"
DRIVE_TYPE_ROTATE_TO = "rotate_to"
DRIVE_TYPE_DRIVE_TO = "drive_to"
DRIVE_TYPE_STOP = "stop"
We add a new method:
def task_factory(self, drive_type: str, **kwargs) -> task.Task:
speed = kwargs.pop('speed', None)
distance = kwargs.pop('distance', None)
radius_turn = kwargs.pop('radius_turn', None)
angle = kwargs.pop('angle', None)
right_turn = kwargs.pop('right_turn', False)
orientation = kwargs.pop('orientation', None)
pos_x = kwargs.pop('pos_x', None)
pos_y = kwargs.pop('pos_y', None)
brake = kwargs.pop('brake', False)
exc = kwargs.pop('exc', None)
if drive_type == DRIVE_TYPE_STRAIGHT:
return task.Task(
self.drive_straight,
args=(speed, distance),
exc=exc
)
elif drive_type == DRIVE_TYPE_TURN:
return task.Task(
self.drive_turn,
args=(speed, radius_turn, angle, right_turn),
exc=exc
)
elif drive_type == DRIVE_TYPE_ROTATE_TO:
return task.Task(
self.rotate_to,
args=(speed, orientation),
exc=exc
)
elif drive_type == DRIVE_TYPE_DRIVE_TO:
return task.Task(
self.drive_to,
args=(speed, pos_x, pos_y),
exc=exc
)
elif drive_type == DRIVE_TYPE_STOP:
return task.Task(
self.stop,
args=(brake,),
exc=exc
)
We test it:
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, time
vehicle = ev3_vehicle.TwoWheelVehicle(
0.02128, # radius_wheel
0.1175, # tread
protocol=ev3.BLUETOOTH,
host='00:16:53:42:2B:99'
)
speed = 25
drive = task.concat(
vehicle.task_factory("turn", speed, radius_turn=0.25, angle=60),
vehicle.task_factory("straight", speed, distance=0.2),
vehicle.task_factory("rotate_to", speed, orientation=0),
vehicle.task_factory("stop", brake=True),
task.Sleep(0.5),
vehicle.task_factory("stop", brake=False)
)
drive.start()
It works, the vehicle does its three moves, then stops hard (with brake).
drive.start()
returns immediately, which allows to start something
parallel. But what happens, when we call method stop? Task
objects don't
know, that vehicles need an explicit call of method stop
.
Stopping a vehicle
We change method task_factory
:
def task_factory(self, drive_type: str, **kwargs) -> task.Task:
speed = kwargs.pop('speed', None)
distance = kwargs.pop('distance', None)
radius_turn = kwargs.pop('radius_turn', None)
angle = kwargs.pop('angle', None)
right_turn = kwargs.pop('right_turn', False)
orientation = kwargs.pop('orientation', None)
pos_x = kwargs.pop('pos_x', None)
pos_y = kwargs.pop('pos_y', None)
brake = kwargs.pop('brake', False)
exc = kwargs.pop('exc', None)
if drive_type == DRIVE_TYPE_STRAIGHT:
t = task.Task(
self.drive_straight,
args=(speed, distance),
action_stop=self.stop,
exc=exc
)
elif drive_type == DRIVE_TYPE_TURN:
t = task.Task(
self.drive_turn,
args=(speed, radius_turn, angle, right_turn),
action_stop=self.stop,
exc=exc
)
elif drive_type == DRIVE_TYPE_ROTATE_TO:
t = task.Task(
self.rotate_to,
args=(speed, orientation),
action_stop=self.stop,
exc=exc
)
elif drive_type == DRIVE_TYPE_DRIVE_TO:
t = task.Task(
self.drive_to,
args=(speed, pos_x, pos_y),
action_stop=self.stop,
exc=exc
)
elif drive_type == DRIVE_TYPE_STOP:
t = task.Task(
self.stop,
args=(brake,),
exc=exc
)
return task.Task(t.start, join=True, exc=exc)
This makes all movements to contained tasks. They are root tasks and will stay root tasks, even when
the returned task object is appended to another task object. We already discussed it in lesson 9.
We modify the test program:
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, time
vehicle = ev3_vehicle.TwoWheelVehicle(
0.02128, # radius_wheel
0.1175, # tread
protocol=ev3.BLUETOOTH,
host='00:16:53:42:2B:99'
)
speed = 25
drive = task.concat(
vehicle.task_factory("turn", speed, radius_turn=0.25, angle=60),
vehicle.task_factory("straight", speed, distance=0.2),
vehicle.task_factory("rotate_to", speed, orientation=0),
vehicle.task_factory("stop", brake=True),
task.Sleep(0.5),
vehicle.task_factory("stop", brake=False)
)
drive.start()
time.sleep(1)
drive.stop()
It turns, then stops, this is, what we expected. The call of method stop
immediately returns,
but the movement lasts longer. This type of stopping is not what we target.
The perfect stopping reacts prompt.
Fast stopping of drive_straight
Actually, the smallest unit of our tasks are the
methods drive_straight
, drive_turn
, rotate_to
and drive_to
. If we want a fast stopping, we
need to split them up into smaller
units. F.i. we can split up drive_straight
into a
chain of tasks, which consits of a Task
object and
a Repeated
object. This allows to stop while
Repeated executes with the option to continue it. First
we take a look into the actual code of method drive_straight
:
def drive_straight(self, speed: int, distance: float) -> None:
...
self.move_straight(speed)
step = round(distance * 360 / (2 * math.pi * self._radius_wheel))
direction = math.copysign(1, speed * self._polarity)
pos_final = [self._pos[0] + direction * step,
self._pos[1] + direction * step]
while True:
value = self._test_pos(direction, pos_final)
if value == -1:
break
time.sleep(value)
It seems easy to realize the lower part with a
Repeated
object. We will code a chain of tasks, the first
part will be a Task
object, the second
a Repeated
. But we need to solve the problem with the
arguments of method _test_pos
. They are not known,
when the
Repeated
task is constructed. Therefore we add a
new attribute to class TwoWheelVehicle
:
def __init__(self, radius_wheel: float, tread: float,
protocol: str=None,
host: str=None,
ev3_obj: ev3.EV3=None):
super().__init__(protocol=protocol,
host=host,
ev3_obj=ev3_obj)
...
self._test_args = None
We change method _test_pos
from:
def _test_pos(
self,
direction,
pos_final: list
) -> float:
if self._to_stop:
...
to:
def _test_pos(self) -> float:
(direction, pos_final) = self._test_args
if self._to_stop:
...
and method drive_straight
to:
def drive_straight(self, speed: int, distance: float) -> None:
...
self._test_pos_args = (direction, pos_final)
while True:
value = self._test_pos()
if value == -1:
break
time.sleep(value)
This solves the problem with the arguments! Now we can write a new method _drive_straight
:
def _drive_straight(self, speed: int, distance: float) -> None:
self.move_straight(speed)
step = round(distance * 360 / (2 * math.pi * self._radius_wheel))
direction = math.copysign(1, speed * self._polarity)
pos_final = [self._pos[0] + direction * step,
self._pos[1] + direction * step]
self._test_pos_args = (direction, pos_final)
and we modify method task_factory
:
def task_factory(
...
) -> task.Task:
...
if drive_type == DRIVE_TYPE_STRAIGHT:
t = task.Task(
self._drive_straight,
args=(speed, distance),
action_stop=self.stop,
exc=exc
)
if distance != None:
t.append(task.Repeated(self._test_pos))
...
The first part, the Task
object, calls method _drive_straight
, which
starts the movement and calculates the arguments of method _test_pos
.
The second part is a Repeated
, that ends, when the final position
is reached.
We test the new code with this program:
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, time
vehicle = ev3_vehicle.TwoWheelVehicle(
0.02128, # radius_wheel
0.1175, # tread
protocol=ev3.BLUETOOTH,
host='00:16:53:42:2B:99'
)
vehicle.verbosity = 1
speed = 25
drive = vehicle.task_factory("straight", speed, distance=0.5)
drive.start()
time.sleep(1)
drive.stop()
its output:
21:22:47.812922 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:47.860821 Recv 0x|0B:00|2A:00|02|B0:0C:00:00:C0:0C:00:00|
21:22:47.962883 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.004716 Recv 0x|0B:00|2B:00|02|CC:0C:00:00:DC:0C:00:00|
21:22:48.250600 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.300776 Recv 0x|0B:00|2C:00|02|17:0D:00:00:27:0D:00:00|
21:22:48.814450 Sent 0x|19:00|2D:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.876683 Recv 0x|0B:00|2D:00|02|AC:0D:00:00:BB:0D:00:00|
The stopping occured one second after the start of the movement, not when the whole distance
was moved. Fast stopping is realized!
Continuing drive_straight
When we call method stop
, we stop the motors. We
have to restart them, when we continue. This is, what the task's
argument action_cont
is thought for. If you look
inside method _test_pos
, you will realize a second
problem. This method calculates the remaining time from the time
gap since its last call. Stopping and continuing will disturb
this algorithm. Therefore continuation needs to act like the
start of a new movement. We add a new attribute to
class TwoWheelVehicle
:
def __init__(self, radius_wheel: float, tread: float,
protocol: str=None,
host: str=None,
ev3_obj: ev3.EV3=None):
...
self._turn = None
self._speed = None
...
We add a line to method move
:
def move(self, speed: int, turn: int) -> None:
...
self._turn = turn
self._speed = speed
self._moves = True
We add the following method:
def _vehicle_cont(self):
self.move(self._speed, self._turn)
self._to_stop = False
self._last_t = None
Setting self._last_t = None
demands the loop to act like a new movement.
We modify method task_factory
:
if drive_type == DRIVE_TYPE_STRAIGHT:
t = task.Task(
self._drive_straight,
args=(speed, distance),
action_stop=self.stop,
action_cont=self._vehicle_cont,
exc=exc
)
if distance != None:
t.append(task.Repeated(self._test_pos))
This will call method _vehicle_cont
when the task continues.
We change method drive_straight
to start a task
and join it:
def drive_straight(self, speed: int, distance: float) -> None:
self.task_factory(
DRIVE_TYPE_STRAIGHT,
speed,
distance=distance
).start().join()
This helps to prevent double code. We started with a factory that called method drive_straight
.
Now we end with the opposite situation, method drive_straight
calls the factory!
We test the continuation with this program:
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, time, datetime
vehicle = ev3_vehicle.TwoWheelVehicle(
0.02128, # radius_wheel
0.1175, # tread
protocol=ev3.BLUETOOTH,
host='00:16:53:42:2B:99'
)
vehicle.verbosity = 1
speed = 25
drive = task.concat(
vehicle.task_factory("straight", speed=speed, distance=0.5),
vehicle.task_factory("stop", brake=True),
task.Sleep(0.5),
vehicle.task_factory("stop")
)
drive.start()
time.sleep(3)
drive.stop()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "*** called stop ***")
time.sleep(2)
drive.cont()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "*** called cont ***")
As you can see, the program consits of three parts:
- creating and administrating class
TwoWheelVehicle
, - constructing task
drive
(a chain of tasks), - opearating task
drive
(needs no knowledge of the tasks inner structure).
its output:
09:17:04.074805 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.124579 Recv 0x|0B:00|2A:00|02|78:0F:00:00:7B:0F:00:00|
09:17:04.226509 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.294573 Recv 0x|0B:00|2B:00|02|9D:0F:00:00:A0:0F:00:00|
09:17:04.566396 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.630528 Recv 0x|0B:00|2C:00|02|F4:0F:00:00:F5:0F:00:00|
09:17:05.237855 Sent 0x|15:00|2D:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:05.313547 Recv 0x|0B:00|2D:00|02|9F:10:00:00:A2:10:00:00|
09:17:06.604481 Sent 0x|15:00|2E:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:06.671509 Recv 0x|0B:00|2E:00|02|FA:11:00:00:FD:11:00:00|
09:17:07.077801 *** called stop ***
09:17:07.078564 Sent 0x|19:00|2F:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:07.286495 Recv 0x|0B:00|2F:00|02|97:12:00:00:9A:12:00:00|
09:17:09.081348 *** called cont ***
09:17:10.608654 Sent 0x|1F:00|30:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:10.667424 Recv 0x|0B:00|30:00|02|C4:12:00:00:C6:12:00:00|
09:17:10.769170 Sent 0x|15:00|31:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:10.812384 Recv 0x|0B:00|31:00|02|E4:12:00:00:E5:12:00:00|
09:17:11.058759 Sent 0x|15:00|32:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.015337 Recv 0x|0B:00|32:00|02|17:14:00:00:18:14:00:00|
09:17:12.017042 Sent 0x|19:00|33:00|00|08:00|A3:00:09:01:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.060297 Recv 0x|0B:00|33:00|02|21:14:00:00:22:14:00:00|
09:17:12.563204 Sent 0x|19:00|34:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.613220 Recv 0x|0B:00|34:00|02|21:14:00:00:22:14:00:00|
At a first look it works as designed! When continued, it again has small time gaps between the readings of the wheels positions, which then grow.
A closer look at the times shows, that the continuation doesn't occur 2 sec. after the stopping. There is a gap of about 1.5 sec. between the call
of method cont
and the sending of the direct command, that continues the movement. This is worth to analyse.
Subclassing Task
We remember how Task
objects are
contined. Attribute _time_action
holds the time of
the next execution of their action. Method _cont2
determines the time of the next action of the continued task
itself and all its contained tasks. When calling
method _cont2
of the contained tasks it sets
argument time_delta
so that the old synchronization
is reconstructed. This was fine for most situations but
method sound
of class Jukebox
subclassed Task
to realize the correct timing. In
our situation, once more the standatrd is not what we need. Our
continuation of the Repeated
object is more the
start of a new movement. This says, we again have to manipulate
attribute _time_action
to get the correct
result. We do it the following way:
def task_factory(self, drive_type: str, **kwargs) -> task.Task:
...
class _Drive(task.Task):
def stop(self):
super().stop()
self._time_action = time.time()
if drive_type == DRIVE_TYPE_STRAIGHT:
t = _Drive(
self._drive_straight,
args=(speed, distance),
action_stop=self.stop,
action_cont=self._vehicle_cont,
exc=exc
)
if distance != None:
t.append(task.Repeated(self._test_pos))
...
Class _Drive
is a subclass
of task.Task
. We modify the root task and
not task.Repeated
because method stop
always is called via the root task. The only modification
of _Drive
objects is that their
method stop
sets _time_action
to the
actual time. I repeated the test and got this output:
09:45:54.792138 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:54.837971 Recv 0x|0B:00|2A:00|02|21:14:00:00:22:14:00:00|
09:45:54.939915 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:54.983938 Recv 0x|0B:00|2B:00|02|40:14:00:00:40:14:00:00|
09:45:55.231559 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:55.309953 Recv 0x|0B:00|2C:00|02|8A:14:00:00:8B:14:00:00|
09:45:55.883439 Sent 0x|15:00|2D:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:55.936879 Recv 0x|0B:00|2D:00|02|35:15:00:00:36:15:00:00|
09:45:57.137283 Sent 0x|15:00|2E:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:57.186836 Recv 0x|0B:00|2E:00|02|74:16:00:00:74:16:00:00|
09:45:57.795134 *** called stop ***
09:45:57.795901 Sent 0x|19:00|2F:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:57.856792 Recv 0x|0B:00|2F:00|02|20:17:00:00:20:17:00:00|
09:45:59.798555 *** called cont ***
09:45:59.862278 Sent 0x|1F:00|30:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:59.917685 Recv 0x|0B:00|30:00|02|3E:17:00:00:3F:17:00:00|
09:46:00.019364 Sent 0x|15:00|31:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:00.396673 Recv 0x|0B:00|31:00|02|AE:17:00:00:AF:17:00:00|
09:46:00.977447 Sent 0x|15:00|32:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:01.392575 Recv 0x|0B:00|32:00|02|AF:18:00:00:AF:18:00:00|
09:46:01.635565 Sent 0x|19:00|33:00|00|08:00|A3:00:09:01:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:01.693532 Recv 0x|0B:00|33:00|02|FB:18:00:00:FC:18:00:00|
09:46:02.196212 Sent 0x|19:00|34:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:02.428546 Recv 0x|0B:00|34:00|02|FB:18:00:00:FB:18:00:00|
This is what I hoped to see!
Method drive_turn
I think, it's straight forward to code the changes of methods drive_turn
, test_o
and
task_factory
. At the end, it behaves like method drive_straight
.
Method rotate_to
Actually method rotate_to
calls
method drive_turn
. We have to change that. Once again
we don't know the arguments, when the task is constructed.
We code a new method _rotate_to
:
def _rotate_to(self, speed: int, orientation: float) -> None:
diff = orientation - self._orientation
diff += 180
diff %= 360
diff -= 180
if diff >= 0:
right_turn = False
direction = 1
else:
right_turn = True
direction = -1
if abs(diff) >= 1:
o_orig = self._orientation
self.move_turn(speed, radius_turn=0, right_turn=right_turn)
if diff > 0 and self._orientation > o_orig + diff:
diff += 360
elif diff < 0 and self._orientation < o_orig + diff:
diff -= 360
delta_pos = self._polarity * diff * 0.5 * self._tread / self._radius_wheel
final_pos = [self._pos[0] - delta_pos,
self._pos[1] + delta_pos]
self._test_args = (direction, o_orig + diff, final_pos)
You see the analogy with method _drive_straight
. We start
the movement and we calculate the arguments of method _test_o
.
We modify method task_factory
:
...
elif drive_type == DRIVE_TYPE_ROTATE_TO:
t = task.concat(
_Drive(
self._rotate_to,
args=(speed, orientation),
action_stop=self.stop,
action_cont=self._vehicle_cont,
exc=exc
),
task.Repeated(self._test_o)
)
...
And we let method rotate_to
call the task factory:
def rotate_to(self, speed: int, orientation: float) -> None:
self.task_factory(
DRIVE_TYPE_ROTATE_TO,
speed,
orientation=orientation
).start().join()
Method drive_to
Method drive_to
combines a turn with a straight movement.
Again we don't know the arguments, which says we can't call methods rotate_to
or drive_straight
, but we will call _rotate_to
and _drive_straight
.
Altogether we construct a chain of tasks with four links.
We modify method task_factory
:
...
elif drive_type == DRIVE_TYPE_DRIVE_TO:
t = task.concat(
_Drive(
self._drive_to_1,
args=(speed, pos_x, pos_y),
action_stop=self.stop,
action_cont=self._vehicle_cont,
exc=exc,
),
task.Repeated(self._test_o),
task.Task(
self._drive_to_2,
args=(speed, pos_x, pos_y)
),
task.Repeated(self._test_pos)
)
...
Method _drive_to_1
will start the turn movement and
calculate the final orientation. Method _drive_to_2
starts a straight movement and calculates the final position of the wheels.
We code the new method _drive_to_1
:
def _drive_to_1(
self,
speed: int,
pos_x: float,
pos_y: float
) -> None:
diff_x = pos_x - self._pos_x
diff_y = pos_y - self._pos_y
dist = math.sqrt(diff_x**2 + diff_y**2)
if abs(diff_x) > abs(diff_y):
direct = math.degrees(math.atan(diff_y/diff_x))
else:
fract = diff_x / diff_y
sign = math.copysign(1.0, fract)
direct = sign * 90 - math.degrees(math.atan(fract))
if diff_x < 0:
direct += 180
self._rotate_to(speed, direct)
And a new method _drive_to_2
:
def _drive_to_2(
self,
speed: int,
pos_x: float,
pos_y: float
) -> None:
diff_x = pos_x - self._pos_x
diff_y = pos_y - self._pos_y
dist = math.sqrt(diff_x**2 + diff_y**2)
self._drive_straight(speed, dist)
Again method drive_to
calls the factory:
def drive_to(
self,
speed: int,
pos_x: float,
pos_y: float
) -> None:
self.task_factory(
DRIVE_TYPE_DRIVE_TO,
speed,
pos_x=pos_x,
pos_y=pos_y
).start().join()
This was it! All the movements of the vehicle stop prompt
and can be continued.
Self driving vehicles
We use the Follow Me vehicle (lesson 5) with its infrared sensor. We want the vehicle to follow a course, but stop, when it detects a barrier. When the obstacle disappeares, the vehicle again follows its course (two times the shape of the digit eight). This is not, what Google or the car manufacturers mean by self driving, but it is what we can achieve with about 80 lines of code.
#!/usr/bin/env python3
import ev3, ev3_vehicle, task, struct
sensor = ev3.EV3(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=sensor)
speed = 50
sensor_gap = 0.1
def look_forward(cont: bool=False):
ops = b''.join([
ev3.opInput_Device,
ev3.READY_RAW,
ev3.LCX(0), # LAYER
ev3.LCX(1), # NO
ev3.LCX(33), # TYPE - EV3-IR
ev3.LCX(0), # MODE - Proximity
ev3.LCX(1), # VALUES
ev3.GVX(0) # VALUE1
])
reply = sensor.send_direct_cmd(ops, global_mem=4)
dist = struct.unpack('<i', reply[5:])[0]
if dist < 10 and not cont:
return True
elif dist >= 10 and cont:
return True
t_drive = task.concat(
task.Repeated(
task.concat(
vehicle.task_factory(
"turn",
speed=speed,
radius_turn=0.25,
angle=360
),
vehicle.task_factory(
"turn",
speed=speed,
radius_turn=-0.25,
angle=360
)
).start,
num=2
),
vehicle.task_factory("stop", brake=True),
task.Sleep(0.5),
vehicle.task_factory("stop")
)
t_controller = task.concat(
task.Task(t_drive.start),
task.Periodic(
sensor_gap,
look_forward
),
task.Task(t_drive.stop),
task.Repeated(
task.concat(
task.Periodic(
sensor_gap,
look_forward,
kwargs={"cont": True}
),
task.Task(t_drive.cont, args=(0,)),
task.Periodic(
sensor_gap,
look_forward
),
task.Task(t_drive.stop)
).start
)
)
t_drive = task.concat(
t_drive,
task.Task(t_controller.stop)
)
t_controller.start()
Again we see this checks and balances behaviour of
two tasks. Task t_controller
starts, stops and
continues task t_drive
, but it
is t_drive
that finally
stops t_controller
, when the course ends.
The interesting part is task t_controller
. This
happens with
task t_drive
: start
, stop
,
then an unlimited number of cont
and stop
. It's important, that the iteration
of Repeated
ends with stop
. Let's look
at an alternative formulation:
t_controller = task.concat(
task.Task(t_drive.start),
task.Repeated(
task.concat(
task.Periodic(
sensor_gap,
look_forward
),
task.Task(t_drive.stop),
task.Periodic(
sensor_gap,
look_forward,
kwargs={"cont": True}
),
task.Task(t_drive.cont, args=(0,))
).start
)
)
This version stops and continues once and never again. It's because the contained task t_drive.cont
is
joined until the driving ends and this prevents the next iteration of the surrounding Repeated
.
Conclusion
The self driving vehicle demonstrates a mechanism of regulation by a controller, that reads sensor values. The task concept has its own coding patterns and it needs some experience to get familiar with it. Coding tasks is not simple, but it helps for an incremental working, where complexity grows step by step. When the construction of the tasks is done, they hide the compexity behind an API which always keeps its simple surface to the outside world.
Recursion makes stopping and continuation to impressive
methods. The optional arguments action_stop
and action_cont
allow even more precision, but
we have seen that sometimes we need subclassing.
Both, the family of tasks
(Task
, Repeated
, Periodic
and Sleep
) and class TwoWheelVehicle
have reached a quality, which allows to realize complex
programs. I hope you have some ideas. It would be great to hear
from.
No comments:
Post a Comment