Introduction
After the sightseeing tour of our last lesson we come now to the real thing, tools that implement multithreading and help to organize multiple tasks. If you didn't read lesson 7, you need profound knowledge of multithreading. If you are not this expert, please visit lesson 7 and then come back.
The basic idea we build our tools upon are linked lists. We name them tasks. A task is an object, that can be started and then executes a sequence of callables. The final result will be somewhat complex (only inside, its outside API will be very simple). Therefore we start with a basic version and add the functionality step by step.
Combining linked lists with multithreading
Let's look at a first version:
#!/usr/bin/env python3
import threading, typing
STATE_INIT = 'INIT'
STATE_STARTED = 'STARTED'
STATE_FINISHED = 'FINISHED'
def concat(*tasks) -> 'Task':
chain = None
for task in tasks:
assert isinstance(task, Task), 'tasks must be instances of class Task'
if not chain:
chain = task
else:
chain.append(task)
return chain
class Task:
def __init__(
self,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
):
assert isinstance(action, typing.Callable), \
"action needs to be a callable"
assert isinstance(args, tuple), 'args needs to be a tuple'
assert isinstance(kwargs, dict), 'kwargs needs to be a dictionary'
self._action = action
self._args = args
self._kwargs = kwargs
self._next = None
self._root = self
# the following are root only attributes
self._state = STATE_INIT
self._thread = None
self._lock = threading.Lock()
self._last = self
def append(self, task: 'Task') -> 'Task':
self._lock.acquire()
assert isinstance(task, Task), "task needs to be a Task instance"
assert self._root is self, "only root tasks can be appended"
assert task._root is task, "both tasks need to be root tasks"
assert self._state in [STATE_INIT, STATE_FINISHED], "can't append to tasks in state " + self._state
assert task._state in [STATE_INIT, STATE_FINISHED], "can't append tasks in state " + task._state
self._last._next = task
self._last = task._last
task.root = self
self._lock.release()
return self
def start(self) -> 'Task':
self._lock.acquire()
assert self._root is self, "only root tasks can be started"
assert self._state in [STATE_INIT, STATE_FINISHED], "can't start from state " + self._state
self._state = STATE_STARTED
self._thread = threading.Thread(target=self._execute)
self._thread.start()
return self
def join(self) -> None:
self._thread.join()
def _execute(self) -> None:
self._root._lock.release()
self._action(*self._args, **self._kwargs)
self._root._lock.acquire()
if self._next:
self._next._execute()
else:
self._root._state = STATE_FINISHED
self._root._lock.release()
return
@property
def state(self) -> str:
self._lock.acquire()
value = self.state_no_lock
self._lock.release()
return value
@property
def state_no_lock(self) -> str:
assert self._root is self, "only root tasks can be asked about their state"
return self._state
@property
def lock(self) -> threading.Lock:
assert self._root is self, "only root tasks can be asked about their lock"
return self._lock
@property
def root(self):
return self._root
@root.setter
def root(self, task):
self._root = task
if self._next:
self._next.root = task
Remarks:
- A task object is a linked list (chain of tasks), that
starts with a root task, which is followed by an unlimited
number of links. All of them are
Task
objects. - All
Task
objects are created as root tasks. Methodappend
does what its name says, it appends a root task (Task
object) to another root task. The result is a chain of tasks, where the appended task is no more a root task. - Function
concat
is syntactic sugar, which allows to build a chain of tasks from a tuple ofTask
objects. - All comunication from and to the outside world is done via the root task, which represents the whole chain of tasks. The following tasks in the linked list become inaccessible for the outside world.
- Attributes of all tasks:
_action
: callable object (f.i. a function or a method)._args
: argument list of_action
._kwargs
: keyword arguments of_action
._next
: pointer to the nextTask
object in the linked list._root
: pointer to the root task (also aTask
object).
- Additional attributes of root tasks:
_state
: actual state of the task (or chain of tasks)._thread
: The thread, that does the execution (Thread
instance from modulethreading
)._lock
: TheLock
object (from modulethreading
)._last
: pointer to the last task in the chain (methodappend
needs it).
- A root task can be identified by
self._root is self
. - Method
start
creates a new thread, that does the execution.start
itself returns immediately. If you need the started thread for your timing, you can call methodjoin
(f.i.task.start().join()
). - Method
_execute
is the heart of the execution. Every task executes its action and then calls method_execute
of the next link in the chain. - Nearly all methods and properties start with aquiring the
Lock
object. This guaranties an exclusive access and finds an object in a consitent state. At the end of the method (or property logic), the lock is released. - The lock is released, when
_action
starts execution and again acquired, after_action
is finished. This says another thread gets access to aTask
object, when:- its state is
STATE_INIT
(before started), - its state is
STATE_FINISHED
, - one of its actions is actually executed.
- its state is
- All the properties, that read changeable data need a locking mechanism.
If the outside world has to read more than one property, it can explicitly lock
and then call the no-lock-version of the properties:
If no explicit locking would be done and the state changed just between the two accesses (STARTED -> FINISHED), it printed an incorrect state (INIT). This case is for demonstration only, a better solution would be:... my_task.lock.acquire() if my_task.state_no_lock is task.STATE_FINISHED: print("finished") elif my_task.state_no_lock is task.STATE_STARTED: print("started") else: print("initial") my_task.lock.release() ...
... state = my_task.state if state is task.STATE_FINISHED: print("finished") elif state is task.STATE_STARTED: print("started") else: print("initial") ...
- the setter of property
root
is recursive. If set in a former root task, it sets the new root for the whole chain of tasks.
Task objects versus Thread objects
Actually Task
objects are very similar to
Thread
objects. Both have methods start
and join
. The main difference is that Task
objects
allow to build chains and handle the chains like single Task
objects.
Parallel executed tasks
As a first test, we run two tasks parallel:
#!/usr/bin/env python3
import ev3, ev3_sound, ev3_vehicle, task
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)
speed = 10
t1 = task.Task(
vehicle.drive_turn,
args=(speed, 0, 720)
)
t2 = task.Task(
jukebox.play_song,
args=(ev3_sound.HAPPY_BIRTHDAY,)
)
t1.start()
t2.start()
t1.join()
t2.join()
vehicle.stop()
The vehicle turns two circles on place and plays Happy Birthday. The longer of both
tasks does the timing.
If we use Thread
objects instead of Task
objects, this reads as:
#!/usr/bin/env python3
import ev3, ev3_sound, ev3_vehicle, threading
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)
speed = 10
t1 = threading.Thread(
target=vehicle.drive_turn,
args=(speed, 0, 720)
)
t2 = threading.Thread(
target=jukebox.play_song,
args=(ev3_sound.HAPPY_BIRTHDAY,)
)
t1.start()
t2.start()
t1.join()
t2.join()
vehicle.stop()
There is no real difference, the handling of both alternatives is very similar.
Chains of tasks
We run a chain of tasks:
#!/usr/bin/env python3
import ev3, ev3_sound, ev3_vehicle, task
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)
speed = 10
task.concat(
task.Task(
vehicle.drive_turn,
args=(speed, 0, 360)
),
task.Task(
vehicle.drive_turn,
args=(speed, 0, 360),
kwargs={"right_turn": True}
),
task.Task(
vehicle.stop
)
).start()
jukebox.play_song(ev3_sound.HAPPY_BIRTHDAY)
The movement of the vehicle is a chain of three links,
driving and music run parallel.
We use a Thread
object instead of Task
objects:
#!/usr/bin/env python3
import ev3, ev3_sound, ev3_vehicle, threading
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)
speed = 10
def do_it():
vehicle.drive_turn(speed, 0, 360)
vehicle.drive_turn(speed, 0, 360, right_turn=True)
vehicle.stop()
threading.Thread(
target=do_it
).start()
jukebox.play_song(ev3_sound.HAPPY_BIRTHDAY)
O.k. it works, but in its actual state it is no real progress to use Task
objects
instead of Thread
objects. It needs additional functionality to make it
a valuable tool.
Repeated actions
Let's look at some scenarios and prove if our class Task
is able to execute them. Here they are:
- Every 0.2 sec. your
EV3
device reads the actual free distance from its infrared sensor. This is repeated 100 times. The times and data are written to a file. - Your application drives a vehicle: Every 0.5 sec. it tests if there is no barrier in front of the vehicle, if the distance is smaller than 30 cm, it stops the motors.
- Your application watches a door: Every 5 sec. it tests, if the touch sensor is touched, if not, it plays some sound signal.
- Your application is a vehicle, that follows a light. Every 20 sec. its light sensor scans all directions for the brightest source of light. Then the orientation of the vehicle is changed to this direction.
TwoWheelVehicle
of lesson 6. We had a very similar situation, where
functions _test_pos
and _test_o
returned numbers:
value == -1
: signals the caller to finish the loop.value > 0
: value is the time to wait until the next call (in sec.).value == 0
: call again without any waiting.
We need some new constants and we import modules time
and numbers
:
#!/usr/bin/env python3
import threading, typing, time, numbers
...
ACTIVITY_NONE = 'NONE'
ACTIVITY_BUSY = 'BUSY'
ACTIVITY_SLEEP = 'SLEEP'
We change the constructor of class Task
:
def __init__(
self,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
):
...
self._num = 0
self._cnt = 0
# the following are root only attributes
...
self._cond = threading.Condition(self._lock)
self._activity = ACTIVITY_NONE
self._time_action = None
The meaning of the new attributes:
_num
: maximum calls number of_action
(value 0 stands for unlimited)._cnt
: counter of_action
calls._cond
: used for interruptable sleeping._activity
: actual activity type (ACTIVITY_NONE
,ACTIVITY_BUSY
orACTIVITY_SLEEP
)._time_action
: time of the actual call of_action
(when_activity is ACTIVITY_BUSY
) or the next call of_action
(when_activity is ACTIVITY_SLEEP
).
We add two lines of code to method start
:
def start(self) -> None:
...
self._cnt = 0
self._time_action = time.time()
self._thread = threading.Thread(target=self._execute)
self._thread.start()
The new version of method _execute
:
def _execute(self) -> None:
while True:
gap = self._wrapper()
self._cnt += 1
if gap == -1 or self._num > 0 and self._cnt >= self._num:
self._root._time_action = time.time()
break
if gap == 0:
self._root._time_action = time.time()
continue
self._root._time_action += gap
real_gap = self._root._time_action - time.time()
if real_gap > 0:
self._root._activity = ACTIVITY_SLEEP
self._root._cond.wait(real_gap)
self._root._activity = ACTIVITY_NONE
self._root._time_action = time.time()
gap = 0
if gap > 0:
self._root._activity = ACTIVITY_SLEEP
self._root._cond.wait(gap)
self._root._activity = ACTIVITY_NONE
if self._next:
self._next._cnt = 0
self._next._execute()
else:
self._final()
with new methods, that wrapp _action
:
def _wrapper(self) -> int:
self._wrapper1()
self._action(*self._args, **self._kwargs)
self._wrapper2()
return -1
def _wrapper1(self):
self._root._activity = ACTIVITY_BUSY
self._root._lock.release()
def _wrapper2(self):
self._root._lock.acquire()
self._root._activity = ACTIVITY_NONE
returning value -1 guaranties that the loop is executed once and no sleeping
will happen. This sounds funny, we code some new logic which is not used.
The explanation is: Task
objects will never use it,
but their subclasses! It allows to subclass Task
with
unchanged method _execute
but new method _wrapper
.
We add a new method, that prepares a consistent combination of attributes in case of returning:
def _final(self) -> None:
self._root._state = STATE_FINISHED:
self._root._time_action = None
self._root._lock.release()
We add some properties:
@property
def time_action(self) -> float:
self._root._lock.acquire()
assert self._root is self, 'only root tasks can be asked about their time_action'
value = self.time_action_no_lock
self._root._lock.release()
return value
@property
def time_action_no_lock(self) -> float:
return self._time_action
@property
def activity(self) -> str:
self._root._lock.acquire()
assert self._root is self, 'only root tasks can be asked about their activity'
value = self.activity_no_lock
self._root._lock.release()
return value
@property
def activity_no_lock(self) -> str:
return self._activity
Class Repeated
We subclass Task
:
class Repeated(Task):
def __init__(
self,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
num: int=0
):
super().__init__(action, args, kwargs)
assert isinstance(num, int), 'num must be an integer'
assert num >= 0, 'num must be positive'
self._num = num
def _wrapper(self):
self._wrapper1()
value = self._action(*self._args, **self._kwargs)
assert isinstance(value, Task) or \
isinstance(value, numbers.Number) or \
isinstance(value, bool) or \
value is None, \
'action needs to return a task, a number, a boolean or None'
assert not isinstance(value, numbers.Number) or \
value == -1 or \
value >= 0, \
'if action return a number, it must be positive or -1'
if value is True:
rc = -1
elif isinstance(value, Task) or value is False or value is None:
rc = 0
else:
rc = value
self._wrapper2()
return rc
It's the callable _action
that controls the execution by its return value. There are three types of return values (None, bool, int):
None
: There is no waiting and the number of calls of_action
must be limited bynum
.bool
: There is no waiting, the calling ends, when value True is returned.int
:-1
: no further call (of_action
).0
: no waiting, call again as soon as possible.> 0
: wait, then call again.
num
is an alternative to control the execution. If set, it is an upper limit for
the number of calls of _action
. It must be set when the Repeated
object is constructed.
Tests
We play the trias three times:
#!/usr/bin/env python3
import ev3, ev3_sound, task
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
task.Repeated(
jukebox.play_song,
args=(ev3_sound.TRIAS,),
num=3
).start()
Task factories
I feel confident in the task concept and try to organize as much actions as
possible in tasks. The best way to this target is
implementing tasks into the subclasses of class EV3
.
Method song of class Jukebox
We start with class Jukebox
and add a method song
:
def song(self, song: dict) -> task.Task:
return task.concat(
task.Task(
self._init_tone
),
task.Repeated(
self._next_tone,
args=(song,)
),
task.Task(
self.stop
)
)
This method returns a Task
object that plays the song,
when started. It is a chain of tasks with three links, but this
is an internal information. The ouside world knows: it's a task, which
can be concatenated or started. We test it with this program:
#!/usr/bin/env python3
import ev3, ev3_sound, task
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.song(ev3_sound.HAPPY_BIRTHDAY).start()
We miss the lightshow, which will be added soon.
Everything seems to work. If you look at the above described scenarios,
we already have the tools to realize them. Periodic
, our next subclass
of Task
will be syntactic sugar. It prevents from
writing wrappers around callables.
Periodic actions
All of our scenarios knew the time distance between two calls
of _action
from the beginning. This allows to give
them as sttributes into the constructor of
the objects. It does not help for
transparency, when the time distance is returned by
method _action
. Our new subclass Periodic
will better fit
this situation:
class Periodic(Task):
def __init__(
self,
intervall: float,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
num: int=0
):
super().__init__(action, args, kwargs)
assert isinstance(intervall, numbers.Number), 'intervall must be a number'
assert intervall >= 0, 'intervall must be positive'
assert isinstance(num, int), 'num must be an integer'
assert num >= 0, 'num must be positive'
self._intervall = intervall
self._num = num
def _wrapper(self):
self._wrapper1()
value = self._action(*self._args, **self._kwargs)
assert isinstance(value, Task) or isinstance(value, bool) or value is None, \
'action needs to return a task, a boolean or None'
if value is True:
rc = -1
else:
rc = self._intervall
self._wrapper2()
return rc
The waiting time is set, when the constructor is called (parameter intervall
).
I think, a good and descriptive variant.
Look at method _wrapper
, the callable _action
may return two types of values (None, bool):
None
: You have to limit the number of calls of_action
by settingnum
.bool
: The calling ends, when value True is returned.
Test
We play the trias three times, once per minute:
#!/usr/bin/env python3
import ev3, ev3_sound, task
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
task.Periodic(
60,
jukebox.play_song,
args=(ev3_sound.TRIAS,),
num=3
).start()
Exact timing
For the moment, exact timing works for repetitions but not for chains of tasks. We want:
- the possibility to switch to netto-time (that says the waiting for the next call of
_action
does not depend from the duration of its last execution), - a chance to set a fixed duration for a task.
def __init__(
self,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
duration: float=None
):
...
assert duration is None or isinstance(duration, numbers.Number), \
'duration needs to be a number'
assert duration is None or duration >= 0, \
'duration needs to be positive'
...
self._duration = duration
self._time_end = None
self._netto_time = False
# the following are root only attributes
...
The meanings of the new attributes:
_duration
: duration for this task. If the task finishes later, the following tasks have to compensate._time_end
: holds the time, when this task has to be finished (is set when executed)._netto_time
: flag, that waiting is netto (execution of action counts extra)
We change method start
:
def start(self) -> None:
self._lock.acquire()
assert self._root is self, "only root tasks can be started"
assert self._state in [STATE_INIT, STATE_FINISHED], \
"can't start from state " + self._state
self._state = STATE_STARTED
self._cnt = 0
self._time_action = time.time()
if self._duration != None:
self._time_end = self._time_action + self._duration
self._thread = threading.Thread(target=self._execute)
self._thread.start()
The updated method _execute
:
def _execute(self) -> None:
while True:
gap = self._wrapper()
self._cnt += 1
if gap == -1 or self._num > 0 and self._cnt >= self._num:
self._root._time_action = time.time()
break
if gap == 0:
self._root._time_action = time.time()
continue
if self._netto_time:
self._root._time_action = time.time() + gap
real_gap = gap
else:
self._root._time_action += gap
real_gap = self._root._time_action - time.time()
if real_gap > 0:
self._root._activity = ACTIVITY_SLEEP
self._root._cond.wait(real_gap)
self._root._activity = ACTIVITY_NONE
if self._time_end:
self._root._time_action = self._time_end
self._time_end = None
gap = self._root._time_action - time.time()
if gap > 0:
self._root._activity = ACTIVITY_SLEEP
self._root._cond.wait(gap)
self._root._activity = ACTIVITY_NONE
else:
self._root._time_action = time.time()
if self._next:
self._next._cnt = 0
if self._next._duration != None:
self._next._time_end = self._root._time_action + self._next._duration
self._next._execute()
else:
self._final()
There is an additional call of method sleep
at the end of the task.
This call guaranties, that the task lasts at least the duration, which was set by the constructor.
_netto_time
is relevant for the classes Repeated
and Periodic
only.
We need to change their constructors. The new constructor of class Repeated
:
def __init__(
self,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
num: int=0,
duration: float=0,
netto_time: bool=False
):
super().__init__(action, args, kwargs, duration=duration)
assert isinstance(num, int), 'num must be an integer'
assert num >= 0, 'num must be positive'
assert isinstance(netto_time, bool), 'netto_time must be a bool value'
self._num = num
self._netto_time = netto_time
The new constructor of class Periodic
:
def __init__(
self,
intervall: numbers.Number,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
num: int=0,
duration: float=0,
netto_time: bool=False
):
super().__init__(action, args, kwargs, duration=duration)
assert isinstance(intervall, numbers.Number), 'intervall must be a number'
assert intervall >= 0, 'intervall must be positive'
assert isinstance(num, int), 'num must be an integer'
assert num >= 0, 'num must be positive'
assert isinstance(netto_time, bool), 'netto_time must be a bool value'
self._intervall = intervall
self._num = num
self._netto_time = netto_time
Subclass Sleep
This allows to create another subclass:
class Sleep(Task):
def __init__(self, seconds: float):
super().__init__(self._do_nothing, duration=seconds)
def _do_nothing(self): return -1
Class Sleep
will allow to be stopped, but this is the future. For the moment
it is just an alternative for:
Task(time.sleep, args=(seconds,))
which is able to compensate the time deviations of previous tasks.
Tests
We test the compensation:
#!/usr/bin/env python3
import ev3, ev3_sound, task, time
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1
task.concat(
task.Task(
time.sleep,
args=(1,),
duration=0.5
),
task.Periodic(
1,
jukebox.play_tone,
args=("a'", 0.1),
num=4
)
).start()
Sleeping for one second with a duration of 0.5 sec.! The chain of tasks will be half a sec. late, when it
has finished its first task (sleep). The second task (play_tone) has to compensate this delay. Its output:
10:27:54.356145 Sent 0x|0D:00|2A:00|80|00:00|94:01:01:82:B8:01:81:64|
10:27:54.855296 Sent 0x|0D:00|2B:00|80|00:00|94:01:01:82:B8:01:81:64|
10:27:55.854861 Sent 0x|0D:00|2C:00|80|00:00|94:01:01:82:B8:01:81:64|
10:27:56.854826 Sent 0x|0D:00|2D:00|80|00:00|94:01:01:82:B8:01:81:64|
Between the first call of play_tone
and the second we see a time distance of half a sec. This
compensates the delay. From then on the time distance is one sec. We recognize,
that the usage of duration allows exact timing and also can prevent exact timing.
We test class Sleep
with a program, that uses tasks for a little lightshow:
#!/usr/bin/env python3
import task, ev3, ev3_sound
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1
t_green = task.Periodic(2, jukebox.change_color, args=(ev3.LED_GREEN,), num=4)
t_red = task.Periodic(4, jukebox.change_color, args=(ev3.LED_RED,), num=2)
t_orange = task.Periodic(4, jukebox.change_color, args=(ev3.LED_ORANGE,), num=2)
t = task.concat(
task.Task(t_red.start),
task.Sleep(1),
task.Task(t_green.start),
task.Sleep(1),
task.Task(t_orange.start),
task.Task(t_green.join)
)
t.start().join()
print("done")
Here we have four tasks, t_green
, t_red
, t_orange
and t
.
t
is the master, that schedules the starting of the three
other tasks. When t_orange
is started, t
joins the task of t_green
.
We want its thread to be joinable (for timing purpose, the join
has to end when the lightshow ends). Its output:
11:26:32.103894 Sent 0x|08:00|2A:00|80|00:00|82:1B:02|
11:26:33.105693 Sent 0x|08:00|2B:00|80|00:00|82:1B:01|
11:26:34.107247 Sent 0x|08:00|2C:00|80|00:00|82:1B:03|
11:26:35.106055 Sent 0x|08:00|2D:00|80|00:00|82:1B:01|
11:26:36.104310 Sent 0x|08:00|2E:00|80|00:00|82:1B:02|
11:26:37.106052 Sent 0x|08:00|2F:00|80|00:00|82:1B:01|
11:26:38.107876 Sent 0x|08:00|30:00|80|00:00|82:1B:03|
11:26:39.106062 Sent 0x|08:00|31:00|80|00:00|82:1B:01|
done
It works and we learned, that tasks can start
other tasks. We plan to make our tasks stoppable,
which says that tasks inside of tasks (contained tasks) need to be
stopped too. The conclusion is, we have to change class Task
once more.
Contained tasks
We add a class attribute _contained_register
to class Task
:
class Task:
_contained_register = {}
Remarks:
- This will hold all contained tasks with their parent tasks. It's
a class attribute that exists once and can be accessed from all
instances of class
Task
. It allows to ask iftask1
is cild of (contained in)task2
:if task1 in task.Task._contained_register and task.Task._contained_register[task1] is task2: print(task1, "is child of", task2)
- The access is from the child to the parent. We delete an entry, when a task is finished:
self._root._state = STATE_FINISHED if self._root in self._contained_register: self._contained_register.pop(self._root)
We add some code to the constructor:
class Task:
def __init__(
self,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
join: bool=False,
duration: float=None
):
...
assert isinstance (join, bool), 'join needs to be a bool value'
assert not join or hasattr(action, '__self__'), 'only bounded methods can be joined'
assert not join or isinstance(action.__self__, Task), 'only instances of Task can be joined'
assert not join or action.__name__ == "start", 'only method start can be joined'
...
self._join = join
...
# the following are root only attributes
...
self._contained = []
The meanings of the new attributes:
_join
: A flag, that signals if the contained task will be joined when starting it. This says the starting task will wait until the started task is finished._contained
: Holds the tasks, which where started as contained (cild) tasks and may still run. In case of stopping the task, these have to be stopped too. This is the opposite direction than_contained_register
: from the parent to the child. Its a list because a task may have multiple contained tasks.
Methods _wrapper1
and _wrapper2
also need to be changed:
def _wrapper1(self) -> None:
if hasattr(self._action, '__self__') and \
isinstance(self._action.__self__, Task) and \
self._action.__name__ in ["start", "join"]:
task = self._action.__self__
name = self._action.__name__
if name == "start":
if not task in self._root._contained:
self._root._contained.append(task)
self._contained_register.update({task: self._root})
if not hasattr(self._action, '__self__') or \
not isinstance(self._action.__self__, Task) or \
not self._action.__name__ == "start" or \
self._action.__name__ == "start" and self._join:
self._root._activity = ACTIVITY_BUSY
self._root._lock.release()
def _wrapper2(self) -> None:
if self._join:
self._action.__self__._thread.join()
if not hasattr(self._action, '__self__') or \
not isinstance(self._action.__self__, Task) or \
not self._action.__name__ == "start" or \
self._action.__name__ == "start" and self._join:
self._root._lock.acquire()
self._root._activity = ACTIVITY_NONE
if hasattr(self._action, '__self__') and \
isinstance(self._action.__self__, Task) and \
self._action.__name__ in ["start", "join"]:
task = self._action.__self__
state = task.state
if state == STATE_FINISHED and \
task in self._root._contained:
self._root._contained.remove(task)
This adds all started tasks to the root tasks list _contained
and guaranties that the root task knows all contained tasks which run parallel.
A few remarks:
- Attribute
__self__
of a method (bounded callable) holds the object, the method belongs to. - Attribute
__name__
of a callable holds its name. - These two attributes help to identify contained tasks (
_action
calls methodstart
of aTask
instance):if hasattr(self._action, '__self__') and \ isinstance(self._action.__self__, Task) and \ self._action.__name__ == "start":
- Starting a task is not time consuming, therefore we do not release the lock. Only if we join the started task.
Method _final
needs to be changed:
def _final(self) -> None:
self._root._contained = self._join_contained()
self._root._state = STATE_FINISHED
if self._root in self._contained_register:
self._contained_register.pop(self._root)
self._root._time_action = None
self._root._lock.release()
with a new method:
def _join_contained(self) -> list:
contained = self._root._contained
self._root._activity = ACTIVITY_JOIN
self._root._lock.release()
not_finished = []
for task in contained:
if not task in self._contained_register or \
not self._contained_register[task] is self._root:
continue
task.join()
if task.state != STATE_FINISHED:
not_finished.append(task)
self._root._lock.acquire()
self._root._activity = ACTIVITY_NONE
return not_finished
The call of
method _join_contained
guaranties, that a task
never is finished, before all its contained tasks are finished.
We add a new activity:
ACTIVITY_NONE = 'NONE'
ACTIVITY_BUSY = 'BUSY'
ACTIVITY_SLEEP = 'SLEEP'
ACTIVITY_JOIN = 'JOIN'
We change property time_action_no_lock
:
@property
def time_action_no_lock(self) -> float:
min = None
if not hasattr(self._action, '__self__') or \
not isinstance(self._action.__self__, Task) or \
not self._action.__name__ in ["start", "join"]:
min = self._time_action
for task in self._contained:
act = task.time_action
if min is None:
min = act
elif act != None and act < min:
min = act
return min
The earliest time of any action is returned, may it be of the tasks own action or one of its direct or indirect contained tasks (recursion).
Starting or joining a task is no action, we ignore them. What counts is the next action of the contained task, which is considered by recursion.
I think, it is not a good idea, when instances of Repeated
or Periodic
start unjoined tasks. Therefore we don't add a parameter join
to their constructors. This says, they can contain
tasks but these are always joined tasks. The constructor of Repeated
:
def __init__(
self,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
num: int=0,
duration: float=0,
netto_time: bool=False
):
if hasattr(action, '__self__') and \
isinstance(action.__self__, Task) and \
action.__name__ == "start":
super().__init__(action, args, kwargs, join=True, duration=duration)
else:
super().__init__(action, args, kwargs, duration=duration)
...
The constructor of Periodic
:
def __init__(
self,
intervall: float,
action: typing.Callable,
args: tuple=(),
kwargs: dict={},
num: int=0,
duration: float=0,
netto_time: bool=False
):
if hasattr(action, '__self__') and \
isinstance(action.__self__, Task) and \
action.__name__ == "start":
super().__init__(action, args, kwargs, join=True, duration=duration)
else:
super().__init__(action, args, kwargs, duration=duration)
...
Testing contained classes
We change our lightshow-program:
#!/usr/bin/env python3
import task, ev3, ev3_sound
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1
t_green = task.Periodic(2, jukebox.change_color, args=(ev3.LED_GREEN,), num=4)
t_red = task.Periodic(4, jukebox.change_color, args=(ev3.LED_RED,), num=2)
t_orange = task.Periodic(4, jukebox.change_color, args=(ev3.LED_ORANGE,), num=2)
task.concat(
task.Task(t_red.start),
task.Sleep(1),
task.Task(t_green.start),
task.Sleep(1),
task.Task(t_orange.start)
).start().join()
print("done")
There is no need for joining t_green
any more, this is done inside. This programs output:
12:39:08.368138 Sent 0x|08:00|2A:00|80|00:00|82:1B:02|
12:39:09.370120 Sent 0x|08:00|2B:00|80|00:00|82:1B:01|
12:39:10.373537 Sent 0x|08:00|2C:00|80|00:00|82:1B:03|
12:39:11.371645 Sent 0x|08:00|2D:00|80|00:00|82:1B:01|
12:39:12.368623 Sent 0x|08:00|2E:00|80|00:00|82:1B:02|
12:39:13.370486 Sent 0x|08:00|2F:00|80|00:00|82:1B:01|
12:39:14.374323 Sent 0x|08:00|30:00|80|00:00|82:1B:03|
12:39:15.370430 Sent 0x|08:00|31:00|80|00:00|82:1B:01|
done
Modifying the task factory
We add another method to class Jukebox
, which also is a task factory.
Method sound
Method sound
plays a sound file. It's API:
| sound(self, path:str, duration:float=None, repeat:bool=False) -> task.Task
| returns a Task object, that plays a sound file
|
| Attributes:
| path: name of the sound file (without extension ".rsf")
|
| Keyword Attributes:
| duration: duration of the sound file (in sec.)
| repeat: flag, if repeatedly playing
The code:
def sound(self, path: str, duration: float=None, repeat: bool=False) -> task.Task:
if repeat:
ops = b''.join([
ev3.opSound,
ev3.REPEAT,
ev3.LCX(self._volume), # VOLUME
ev3.LCS(path) # NAME
])
else:
ops = b''.join([
ev3.opSound,
ev3.PLAY,
ev3.LCX(self._volume), # VOLUME
ev3.LCS(path) # NAME
])
if not duration:
return task.Task(
self.send_direct_cmd,
args=(ops,)
)
elif not repeat:
return task.Task(
self.send_direct_cmd,
args=(ops,),
duration=duration
)
else:
return task.concat(
task.Task(
self.send_direct_cmd,
args=(ops,),
duration=duration
),
task.Task(self.stop)
)
We test it with this program:
#!/usr/bin/env python3
import ev3, ev3_sound
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1
t = jukebox.sound('./ui/DownloadSucces', duration=5, repeat=True).start()
It's output:
12:43:28.528101 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
12:43:33.528569 Sent 0x|07:00|2B:00|80|00:00|94:00|
A second test where we wrap a Repeated
around the Task
:
#!/usr/bin/env python3
import ev3, ev3_sound, task
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1
t = task.Repeated(
jukebox.sound('./ui/DownloadSucces', duration=2).start,
num=3
).start()
The output:
12:45:46.594595 Sent 0x|1D:00|2A:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
12:45:48.612966 Sent 0x|1D:00|2B:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
12:45:50.614405 Sent 0x|1D:00|2C:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
Method song
Now we add the lightshow to method song
. First we add an attribute _pos_led
to the constructor of class Jukebox
:
class Jukebox(ev3.EV3):
def __init__(self, protocol: str=None, host: str=None, ev3_obj=None):
super().__init__(protocol=protocol, host=host, ev3_obj=ev3_obj)
self._volume = 1
self._temperament = 440
self._pos_tone = None
self._pos_led = None
self._plays = False
Then we add the methods _init_color
and _next_color
to class Jukebox
:
def _init_color(self) -> None:
self._pos_led = 0
def _next_color(self, song) -> bool:
if not self._plays:
return True
self.change_color(song["led_sequence"][self._pos_led])
self._pos_led += 1
self._pos_led %= len(song["led_sequence"])
We modify method stop
:
def stop(self) -> None:
self.send_direct_cmd(ev3.opSound + ev3.BREAK)
if self._plays:
self._plays = False
self.change_color(ev3.LED_GREEN)
We modify method song
:
def song(self, song: dict) -> task.Task:
tones = task.concat(
task.Task(self._init_tone),
task.Repeated(
self._next_tone,
args=(song,)
),
task.Task(self.stop)
)
colors = task.Periodic(
60 * song["beats_per_bar"] / song["tempo"],
self._next_color,
args=(song,)
)
if "upbeat" in song:
colors = task.concat(
task.Sleep(60 * song["upbeat"] / song["tempo"]),
colors
)
colors = task.concat(
task.Task(self._init_color),
colors
)
return task.concat(
task.Task(tones.start),
task.Task(colors.start),
task.Task(tones.join)
)
There is a little trick in this code. Up to now, we can't stop
a Task
object. Here we need to stop
task colors
when task tones
is
finished. This is done by method stop
, which sets
attribute _plays = False
.
Attribute _plays
signals
method _next_color
to return
value True
. This isn't good style and we will
simplify the code, when our tasks can be stopped. For the moment
we are happy with the solution and we test it:
#!/usr/bin/env python3
import task, ev3, ev3_sound_tmp as ev3_sound
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1
task.concat(
jukebox.song(ev3_sound.HAPPY_BIRTHDAY),
task.Sleep(2),
jukebox.song(ev3_sound.TRIAS)
).start()
Last Test
As the last test of this lesson, we pack a Periodic
into a Periodic
into a Periodic
:
#!/usr/bin/env python3
import task, ev3, ev3_sound
jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1
t_inner = task.Periodic(0.25, jukebox.play_tone, args=("c", 0.1), duration=1, num=4)
t_middle = task.Periodic(2, t_inner.start, num=2)
t_outer = task.Periodic(2, t_middle.start, num=2, netto_time=True)
t_outer.start()
its output:
12:54:55.427018 Sent 0x|0D:00|2A:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:55.678632 Sent 0x|0D:00|2B:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:55.928161 Sent 0x|0D:00|2C:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:56.177827 Sent 0x|0D:00|2D:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:57.427841 Sent 0x|0D:00|2E:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:57.678454 Sent 0x|0D:00|2F:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:57.928489 Sent 0x|0D:00|30:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:58.178475 Sent 0x|0D:00|31:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:00.430093 Sent 0x|0D:00|32:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:00.680987 Sent 0x|0D:00|33:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:00.930972 Sent 0x|0D:00|34:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:01.181053 Sent 0x|0D:00|35:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:02.430725 Sent 0x|0D:00|36:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:02.681313 Sent 0x|0D:00|37:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:02.931478 Sent 0x|0D:00|38:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:03.181468 Sent 0x|0D:00|39:00|80|00:00|94:01:01:82:83:00:81:64|
Great, it works! Maybe you get the feeling, that using tasks will result in
perls motto There's more than one way to do it. Yes I think
so and I'm a bit concerned about it. But for the moment I see
this freedom as a chance.
At the beginning of this lesson, we compared
class Task
and class Thread
. We did
not see any real advantage of class Task
. Then we
developed it further and it became a usefull tool to organize
tasks.
Conclusion
We coded a family of task classes with the following relatives:
Task
: encapsulates a callable, which will be executed once.class Task(builtins.object) | Uses multithreading for tasks or chains of tasks. | In standard case it's an action, which is executed by a single callable. | Subsequent tasks or chains of tasks can be added with method append(). | | Methods defined here: | | __init__(self, action, args:tuple=(), kwargs:dict={}, duration:float=None, join:bool=False) | action: callable object (f.i. a function) | args: argument list of action | kwargs: keyword arguments of action | duration: duration of task (if action returns earlier, task will wait) | join: flag if contained task will be joined
Repeated
: encapsulates a callable that will run multiple times.class Repeated(Task) | Organizes repeated actions with multithreading (control comes back immediately). | think of task as: | while True: | gap = action(*args, **kwargs) | if gap is False or gap is None: | pass | elif gap is True or gap == -1: | break | else: | time.sleep(gap) | | Methods defined here: | | __init__(self, action, args:tuple=(), kwargs:dict={}, num:int=0, duration:float=0, netto_time:bool=False) | action: callable object, which is repeatedly called (f.i. a function) | If callable it must return a number, a bool or None: | True, -1: end the loop | False, None: next call directly follows (if not reached limit of num) | positive number: time gap between the actual and the next call | args: argument list of action | kwargs: keyword arguments of action | num: number of calls (0 stands for unlimited) | duration: duration of task (if execution ends earlier, task will wait) | netto_time: flag, that waiting is netto (execution of action counts extra)
Periodic
: encapsulates a callable to run it multiple times with a fixed time-intervall.class Periodic(Task) | Uses multithreading for periodic actions (control comes back immediately). | think of task as: | while not action(*args, **kwargs): | time.sleep(intervall) | | Methods defined here: | | __init__(self, intervall:float, action, args:tuple=(), kwargs:dict={}, num:int=0, duration:float=0, netto_time:bool=False) | intervall: intervall between two calls of action (in seconds) | action: callable object, which is repeatedly called (f.i. a function) | It returns a bool or None: | True: end the loop | False, None: next call will follow (if not reached limit of num) | args: argument list of action | kwargs: keyword arguments of action | num: number of calls (0 stands for unlimited) | duration: duration of task (if execution ends earlier, task will wait) | netto_time: flag, that waiting is netto (execution of action counts extra)
Sleep
: sleepsclass Sleep(Task) | Sleeps | | Methods defined here: | | __init__(self, seconds:float) | seconds: duration of sleeping
Task
and can
be combined (in any order) to build chains of tasks. Behind the scenes,
they use multithreading, which allows starting parallel tasks
(even inside a task one can start parallel tasks).
All tasks can be parametrized for exact timing. This helps for
a high level of time control. Locking also is done in the
background. Only for seldom cases one needs to think about the
locking mechanism.
task
objects have the following methods:
| append(self, task) -> 'Task'
| appends a task or a chain of tasks (both must be root tasks)
|
| join(self) -> None
| joins the thread of the task
| think as: my_task.thread.join(), but evaluated when called
|
| start(self) -> 'Task'
| starts execution of task (finished tasks may be started again)
Tasks have an easy to handle API and hide the details of multithreading
and locking but they are flexible to use. They are independent
from the EV3
device and can be used for multiple
kinds of software projects, where multithreading is needed.
Tasks are both, architecture and glue. They allow
to code callable atoms and then put them together to tasks with
complex functionality. The construction of the task from
its atoms often needs the thinking of an architect, but all resulting
Task
objects have the very same simple API. The
users of Task
objects need no knowledge of
its inner structure.
For the moment, tasks can't be stopped and continued and their error handling must be improved. These will be topics of our next lesson. We will extend the methods to:
| append(self, task) -> 'Task'
| appends a task or a chain of tasks (both must be root tasks)
|
| cont(self, gap:float=0) -> 'Task'
| continues a stopped task (must be a root task)
| gap: sets the waiting time before the next action occurs (in seconds)
|
| join(self) -> None
| joins the thread of the task
| think as: my_task.thread.join(), but evaluated when called
|
| start(self, gap:float=0) -> 'Task'
| starts execution of task (finished or stopped tasks may be started again)
| gap: sets the waiting time, before start occurs (in seconds)
|
| stop(self) -> None
| stops execution as fast as possible
| allows to continue with method cont or restart with method start
| already finished tasks silently do nothing
Now time has come to play around with the new tools. Modify your programs The depressed giraffe and The dancing robot and use tasks. Be creative and realize some of your own ideas and develop a feeling for the task concept. When you finished your playing, come back to lesson 9.
No comments:
Post a Comment