Saturday 14 May 2016

Lesson 8 - Organizing Tasks

EV3 Direct commands - Lesson 08

Introduction

After the sightseeing tour of our last lesson we come now to the real thing, tools that implement multithreading and help to organize multiple tasks. If you didn't read lesson 7, you need profound knowledge of multithreading. If you are not this expert, please visit lesson 7 and then come back.

The basic idea we build our tools upon are linked lists. We name them tasks. A task is an object, that can be started and then executes a sequence of callables. The final result will be somewhat complex (only inside, its outside API will be very simple). Therefore we start with a basic version and add the functionality step by step.

Combining linked lists with multithreading

Let's look at a first version:


#!/usr/bin/env python3

import threading, typing

STATE_INIT = 'INIT'
STATE_STARTED = 'STARTED'
STATE_FINISHED = 'FINISHED'

def concat(*tasks) -> 'Task':
    chain = None
    for task in tasks:
        assert isinstance(task, Task), 'tasks must be instances of class Task'
        if not chain:
            chain = task
        else:
            chain.append(task)
    return chain

class Task:
    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
    ):
        assert isinstance(action, typing.Callable), \
            "action needs to be a callable"
        assert isinstance(args, tuple), 'args needs to be a tuple'
        assert isinstance(kwargs, dict), 'kwargs needs to be a dictionary'
        self._action = action
        self._args = args
        self._kwargs = kwargs
        self._next = None
        self._root = self
        # the following are root only attributes
        self._state = STATE_INIT
        self._thread = None
        self._lock = threading.Lock()
        self._last = self

    def append(self, task: 'Task') -> 'Task':
        self._lock.acquire()
        assert isinstance(task, Task), "task needs to be a Task instance"
        assert self._root is self, "only root tasks can be appended"
        assert task._root is task, "both tasks need to be root tasks"
        assert self._state in [STATE_INIT, STATE_FINISHED], "can't append to tasks in state " + self._state
        assert task._state in [STATE_INIT, STATE_FINISHED], "can't append tasks in state " + task._state
        self._last._next = task
        self._last = task._last
        task.root = self
        self._lock.release()
        return self

    def start(self) -> 'Task':
        self._lock.acquire()
        assert self._root is self, "only root tasks can be started"
        assert self._state in [STATE_INIT, STATE_FINISHED], "can't start from state " + self._state
        self._state = STATE_STARTED
        self._thread = threading.Thread(target=self._execute)
        self._thread.start()
        return self

    def join(self) -> None:
        self._thread.join()
        
    def _execute(self) -> None:
        self._root._lock.release()
        self._action(*self._args, **self._kwargs)
        self._root._lock.acquire()
        if self._next:
            self._next._execute()
        else:
            self._root._state = STATE_FINISHED
            self._root._lock.release()
            return

    @property
    def state(self) -> str:
        self._lock.acquire()
        value = self.state_no_lock
        self._lock.release()
        return value
    
    @property
    def state_no_lock(self) -> str:
        assert self._root is self, "only root tasks can be asked about their state"
        return self._state
    
    @property
    def lock(self) -> threading.Lock:
        assert self._root is self, "only root tasks can be asked about their lock"
        return self._lock

    @property
    def root(self):
        return self._root
    @root.setter
    def root(self, task):
        self._root = task
        if self._next:
            self._next.root = task
      
Remarks:
  • A task object is a linked list (chain of tasks), that starts with a root task, which is followed by an unlimited number of links. All of them are Task objects.
  • All Task objects are created as root tasks. Method append does what its name says, it appends a root task (Task object) to another root task. The result is a chain of tasks, where the appended task is no more a root task.
  • Function concat is syntactic sugar, which allows to build a chain of tasks from a tuple of Task objects.
  • All comunication from and to the outside world is done via the root task, which represents the whole chain of tasks. The following tasks in the linked list become inaccessible for the outside world.
  • Attributes of all tasks:
    • _action: callable object (f.i. a function or a method).
    • _args: argument list of _action.
    • _kwargs: keyword arguments of _action.
    • _next: pointer to the next Task object in the linked list.
    • _root: pointer to the root task (also a Task object).
  • Additional attributes of root tasks:
    • _state: actual state of the task (or chain of tasks).
    • _thread: The thread, that does the execution (Thread instance from module threading).
    • _lock: The Lock object (from module threading).
    • _last: pointer to the last task in the chain (method append needs it).
  • A root task can be identified by self._root is self.
  • Method start creates a new thread, that does the execution. start itself returns immediately. If you need the started thread for your timing, you can call method join (f.i. task.start().join()).
  • Method _execute is the heart of the execution. Every task executes its action and then calls method _execute of the next link in the chain.
  • Nearly all methods and properties start with aquiring the Lock object. This guaranties an exclusive access and finds an object in a consitent state. At the end of the method (or property logic), the lock is released.
  • The lock is released, when _action starts execution and again acquired, after _action is finished. This says another thread gets access to a Task object, when:
    • its state is STATE_INIT (before started),
    • its state is STATE_FINISHED,
    • one of its actions is actually executed.
  • All the properties, that read changeable data need a locking mechanism. If the outside world has to read more than one property, it can explicitly lock and then call the no-lock-version of the properties:
    
    ...
    my_task.lock.acquire()
    if my_task.state_no_lock is task.STATE_FINISHED:
        print("finished")
    elif my_task.state_no_lock is task.STATE_STARTED:
        print("started")
    else:
        print("initial")
    my_task.lock.release()
    ...   
       
    If no explicit locking would be done and the state changed just between the two accesses (STARTED -> FINISHED), it printed an incorrect state (INIT). This case is for demonstration only, a better solution would be:
    
    ...
    state = my_task.state
    if state is task.STATE_FINISHED:
        print("finished")
    elif state is task.STATE_STARTED:
        print("started")
    else:
        print("initial")
    ...   
       
  • the setter of property root is recursive. If set in a former root task, it sets the new root for the whole chain of tasks.

Task objects versus Thread objects

Actually Task objects are very similar to Thread objects. Both have methods start and join. The main difference is that Task objects allow to build chains and handle the chains like single Task objects.

Parallel executed tasks

As a first test, we run two tasks parallel:


#!/usr/bin/env python3

import ev3, ev3_sound, ev3_vehicle, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)

speed = 10
t1 = task.Task(
    vehicle.drive_turn,
    args=(speed, 0, 720)
)
t2 = task.Task(
    jukebox.play_song,
    args=(ev3_sound.HAPPY_BIRTHDAY,)
)
t1.start()
t2.start()
t1.join()
t2.join()
vehicle.stop()
      
The vehicle turns two circles on place and plays Happy Birthday. The longer of both tasks does the timing.

If we use Thread objects instead of Task objects, this reads as:


#!/usr/bin/env python3

import ev3, ev3_sound, ev3_vehicle, threading

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)

speed = 10
t1 = threading.Thread(
    target=vehicle.drive_turn,
    args=(speed, 0, 720)
)
t2 = threading.Thread(
    target=jukebox.play_song,
    args=(ev3_sound.HAPPY_BIRTHDAY,)
)
t1.start()
t2.start()
t1.join()
t2.join()
vehicle.stop()
    
There is no real difference, the handling of both alternatives is very similar.

Chains of tasks

We run a chain of tasks:


#!/usr/bin/env python3

import ev3, ev3_sound, ev3_vehicle, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)

speed = 10
task.concat(
    task.Task(
        vehicle.drive_turn,
        args=(speed, 0, 360)
    ),
    task.Task(
        vehicle.drive_turn,
        args=(speed, 0, 360),
        kwargs={"right_turn": True}
    ),
    task.Task(
        vehicle.stop
    )
).start()
jukebox.play_song(ev3_sound.HAPPY_BIRTHDAY)
      
The movement of the vehicle is a chain of three links, driving and music run parallel.

We use a Thread object instead of Task objects:


#!/usr/bin/env python3

import ev3, ev3_sound, ev3_vehicle, threading

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)

speed = 10

def do_it():
    vehicle.drive_turn(speed, 0, 360)
    vehicle.drive_turn(speed, 0, 360, right_turn=True)
    vehicle.stop()
    
threading.Thread(
    target=do_it
).start()
jukebox.play_song(ev3_sound.HAPPY_BIRTHDAY)
      
O.k. it works, but in its actual state it is no real progress to use Task objects instead of Thread objects. It needs additional functionality to make it a valuable tool.

Repeated actions

Let's look at some scenarios and prove if our class Task is able to execute them. Here they are:

  • Every 0.2 sec. your EV3 device reads the actual free distance from its infrared sensor. This is repeated 100 times. The times and data are written to a file.
  • Your application drives a vehicle: Every 0.5 sec. it tests if there is no barrier in front of the vehicle, if the distance is smaller than 30 cm, it stops the motors.
  • Your application watches a door: Every 5 sec. it tests, if the touch sensor is touched, if not, it plays some sound signal.
  • Your application is a vehicle, that follows a light. Every 20 sec. its light sensor scans all directions for the brightest source of light. Then the orientation of the vehicle is changed to this direction.
All these scenarios don't fit our tool. We need some modifications to deal with repeated actions. Hopefully you remember class TwoWheelVehicle of lesson 6. We had a very similar situation, where functions _test_pos and _test_o returned numbers:
  • value == -1: signals the caller to finish the loop.
  • value > 0: value is the time to wait until the next call (in sec.).
  • value == 0: call again without any waiting.

We need some new constants and we import modules time and numbers:


#!/usr/bin/env python3

import threading, typing, time, numbers

...

ACTIVITY_NONE = 'NONE'
ACTIVITY_BUSY = 'BUSY'
ACTIVITY_SLEEP = 'SLEEP'
  

We change the constructor of class Task:


    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
    ):
        ...
        self._num = 0
        self._cnt = 0
        # the following are root only attributes
        ...
        self._cond = threading.Condition(self._lock)
        self._activity = ACTIVITY_NONE
        self._time_action = None
      
The meaning of the new attributes:
  • _num: maximum calls number of _action (value 0 stands for unlimited).
  • _cnt: counter of _action calls.
  • _cond: used for interruptable sleeping.
  • _activity: actual activity type (ACTIVITY_NONE, ACTIVITY_BUSY or ACTIVITY_SLEEP).
  • _time_action: time of the actual call of _action (when _activity is ACTIVITY_BUSY) or the next call of _action (when _activity is ACTIVITY_SLEEP).

We add two lines of code to method start:


    def start(self) -> None:
        ...
        self._cnt = 0
        self._time_action = time.time()
        self._thread = threading.Thread(target=self._execute)
        self._thread.start()
      

The new version of method _execute:


    def _execute(self) -> None:
        while True:
            gap = self._wrapper()
            self._cnt += 1
            if gap == -1 or self._num > 0 and self._cnt >= self._num:
                self._root._time_action = time.time()
                break
            if gap == 0:
                self._root._time_action = time.time()
                continue
            self._root._time_action += gap
            real_gap = self._root._time_action - time.time()
            if real_gap > 0:
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(real_gap)
                self._root._activity = ACTIVITY_NONE
        self._root._time_action = time.time()
        gap = 0
        if gap > 0:
            self._root._activity = ACTIVITY_SLEEP
            self._root._cond.wait(gap)
            self._root._activity = ACTIVITY_NONE
        if self._next:
            self._next._cnt = 0
            self._next._execute()
        else:
            self._final()
      
with new methods, that wrapp _action:

    def _wrapper(self) -> int:
        self._wrapper1()
        self._action(*self._args, **self._kwargs)
        self._wrapper2()
        return -1
    
    def _wrapper1(self):
        self._root._activity = ACTIVITY_BUSY
        self._root._lock.release()

    def _wrapper2(self):
        self._root._lock.acquire()
        self._root._activity = ACTIVITY_NONE
      
returning value -1 guaranties that the loop is executed once and no sleeping will happen. This sounds funny, we code some new logic which is not used. The explanation is: Task objects will never use it, but their subclasses! It allows to subclass Task with unchanged method _execute but new method _wrapper.

We add a new method, that prepares a consistent combination of attributes in case of returning:


    def _final(self) -> None:
        self._root._state = STATE_FINISHED:
        self._root._time_action = None
        self._root._lock.release()
      

We add some properties:


    @property
    def time_action(self) -> float:
        self._root._lock.acquire()
        assert self._root is self, 'only root tasks can be asked about their time_action'
        value = self.time_action_no_lock
        self._root._lock.release()
        return value

    @property
    def time_action_no_lock(self) -> float:
        return self._time_action

    @property
    def activity(self) -> str:
        self._root._lock.acquire()
        assert self._root is self, 'only root tasks can be asked about their activity'
        value = self.activity_no_lock
        self._root._lock.release()
        return value

    @property
    def activity_no_lock(self) -> str:
        return self._activity
      

Class Repeated

We subclass Task:


class Repeated(Task):
    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0
    ):
        super().__init__(action, args, kwargs)
        assert isinstance(num, int), 'num must be an integer'
        assert num >= 0, 'num must be positive'
        self._num = num

    def _wrapper(self):
        self._wrapper1()
        value = self._action(*self._args, **self._kwargs)
        assert isinstance(value, Task) or \
            isinstance(value, numbers.Number) or \
            isinstance(value, bool) or \
            value is None, \
            'action needs to return a task, a number, a boolean or None'
        assert not isinstance(value, numbers.Number) or \
            value == -1 or \
            value >= 0, \
            'if action return a number, it must be positive or -1'
        if value is True:
            rc = -1
        elif  isinstance(value, Task) or value is False or value is None:
            rc = 0
        else:
            rc = value
        self._wrapper2()
        return rc
      
It's the callable _action that controls the execution by its return value. There are three types of return values (None, bool, int):
  • None: There is no waiting and the number of calls of _action must be limited by num.
  • bool: There is no waiting, the calling ends, when value True is returned.
  • int:
    • -1: no further call (of _action).
    • 0: no waiting, call again as soon as possible.
    • > 0: wait, then call again.
num is an alternative to control the execution. If set, it is an upper limit for the number of calls of _action. It must be set when the Repeated object is constructed.

Tests

We play the trias three times:


#!/usr/bin/env python3

import ev3, ev3_sound, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
task.Repeated(
    jukebox.play_song,
    args=(ev3_sound.TRIAS,),
    num=3
).start()
      

Task factories

I feel confident in the task concept and try to organize as much actions as possible in tasks. The best way to this target is implementing tasks into the subclasses of class EV3.

Method song of class Jukebox

We start with class Jukebox and add a method song:


    def song(self, song: dict) -> task.Task:
        return task.concat(
            task.Task(
                self._init_tone
            ),
            task.Repeated(
                self._next_tone,
                args=(song,)
            ),
            task.Task(
                self.stop
            )                
        )
      
This method returns a Task object that plays the song, when started. It is a chain of tasks with three links, but this is an internal information. The ouside world knows: it's a task, which can be concatenated or started. We test it with this program:

#!/usr/bin/env python3

import ev3, ev3_sound, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.song(ev3_sound.HAPPY_BIRTHDAY).start()
      
We miss the lightshow, which will be added soon. Everything seems to work. If you look at the above described scenarios, we already have the tools to realize them. Periodic, our next subclass of Task will be syntactic sugar. It prevents from writing wrappers around callables.

Periodic actions

All of our scenarios knew the time distance between two calls of _action from the beginning. This allows to give them as sttributes into the constructor of the objects. It does not help for transparency, when the time distance is returned by method _action. Our new subclass Periodic will better fit this situation:


class Periodic(Task):
    def __init__(
            self,
            intervall: float,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0
    ):
        super().__init__(action, args, kwargs)
        assert isinstance(intervall, numbers.Number), 'intervall must be a number'
        assert intervall >= 0, 'intervall must be positive'
        assert isinstance(num, int), 'num must be an integer'
        assert num >= 0, 'num must be positive'
        self._intervall = intervall
        self._num = num

    def _wrapper(self):
        self._wrapper1()
        value = self._action(*self._args, **self._kwargs)
        assert isinstance(value, Task) or isinstance(value, bool) or value is None, \
            'action needs to return a task, a boolean or None'
        if value is True:
            rc = -1
        else:
            rc = self._intervall
        self._wrapper2()
        return rc
      
The waiting time is set, when the constructor is called (parameter intervall). I think, a good and descriptive variant. Look at method _wrapper, the callable _action may return two types of values (None, bool):
  • None: You have to limit the number of calls of _action by setting num.
  • bool: The calling ends, when value True is returned.

Test

We play the trias three times, once per minute:


#!/usr/bin/env python3

import ev3, ev3_sound, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
task.Periodic(
    60,
    jukebox.play_song,
    args=(ev3_sound.TRIAS,),
    num=3
).start()
      

Exact timing

For the moment, exact timing works for repetitions but not for chains of tasks. We want:

  • the possibility to switch to netto-time (that says the waiting for the next call of _action does not depend from the duration of its last execution),
  • a chance to set a fixed duration for a task.
This says we have to add some functionality. The new constructor:

    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            duration: float=None
    ):
        ...
        assert duration is None or isinstance(duration, numbers.Number), \
            'duration needs to be a number'
        assert duration is None or duration >= 0, \
            'duration needs to be positive'
        ...
        self._duration = duration
        self._time_end = None
        self._netto_time = False
        # the following are root only attributes
        ...
      
The meanings of the new attributes:
  • _duration: duration for this task. If the task finishes later, the following tasks have to compensate.
  • _time_end: holds the time, when this task has to be finished (is set when executed).
  • _netto_time: flag, that waiting is netto (execution of action counts extra)

We change method start:


    def start(self) -> None:
        self._lock.acquire()
        assert self._root is self, "only root tasks can be started"
        assert self._state in [STATE_INIT, STATE_FINISHED], \
            "can't start from state " + self._state
        self._state = STATE_STARTED
        self._cnt = 0
        self._time_action = time.time()
        if self._duration != None:
            self._time_end = self._time_action + self._duration
        self._thread = threading.Thread(target=self._execute)
        self._thread.start()
      

The updated method _execute:


    def _execute(self) -> None:
        while True:
            gap = self._wrapper()
            self._cnt += 1
            if gap == -1 or self._num > 0 and self._cnt >= self._num:
                self._root._time_action = time.time()
                break
            if gap == 0:
                self._root._time_action = time.time()
                continue
            if self._netto_time:
                self._root._time_action = time.time() + gap
                real_gap = gap
            else:
                self._root._time_action += gap
                real_gap = self._root._time_action - time.time()
            if real_gap > 0:
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(real_gap)
                self._root._activity = ACTIVITY_NONE
        if self._time_end:
            self._root._time_action = self._time_end
            self._time_end = None
            gap = self._root._time_action - time.time()
            if gap > 0:
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(gap)
                self._root._activity = ACTIVITY_NONE
        else:
            self._root._time_action = time.time()
        if self._next:
            self._next._cnt = 0
            if self._next._duration != None:
                self._next._time_end = self._root._time_action + self._next._duration
            self._next._execute()
        else:
            self._final()
      
There is an additional call of method sleep at the end of the task. This call guaranties, that the task lasts at least the duration, which was set by the constructor.

_netto_time is relevant for the classes Repeated and Periodic only. We need to change their constructors. The new constructor of class Repeated:


    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0,
            duration: float=0,
            netto_time: bool=False
    ):
        super().__init__(action, args, kwargs, duration=duration)
        assert isinstance(num, int), 'num must be an integer'
        assert num >= 0, 'num must be positive'
        assert isinstance(netto_time, bool), 'netto_time must be a bool value'
        self._num = num
        self._netto_time = netto_time
      

The new constructor of class Periodic:


    def __init__(
            self,
            intervall: numbers.Number,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0,
            duration: float=0,
            netto_time: bool=False
    ):
        super().__init__(action, args, kwargs, duration=duration)
        assert isinstance(intervall, numbers.Number), 'intervall must be a number'
        assert intervall >= 0, 'intervall must be positive'
        assert isinstance(num, int), 'num must be an integer'
        assert num >= 0, 'num must be positive'
        assert isinstance(netto_time, bool), 'netto_time must be a bool value'
        self._intervall = intervall
        self._num = num
        self._netto_time = netto_time
      

Subclass Sleep

This allows to create another subclass:


class Sleep(Task):
    def __init__(self, seconds: float):
        super().__init__(self._do_nothing, duration=seconds)

    def _do_nothing(self): return -1
      
Class Sleep will allow to be stopped, but this is the future. For the moment it is just an alternative for:

Task(time.sleep, args=(seconds,))
      
which is able to compensate the time deviations of previous tasks.

Tests

We test the compensation:


#!/usr/bin/env python3

import ev3, ev3_sound, task, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

task.concat(
    task.Task(
        time.sleep,
        args=(1,),
        duration=0.5
    ),
    task.Periodic(
        1,
        jukebox.play_tone,
        args=("a'", 0.1),
        num=4
    )
).start()
      
Sleeping for one second with a duration of 0.5 sec.! The chain of tasks will be half a sec. late, when it has finished its first task (sleep). The second task (play_tone) has to compensate this delay. Its output:

10:27:54.356145 Sent 0x|0D:00|2A:00|80|00:00|94:01:01:82:B8:01:81:64|
10:27:54.855296 Sent 0x|0D:00|2B:00|80|00:00|94:01:01:82:B8:01:81:64|
10:27:55.854861 Sent 0x|0D:00|2C:00|80|00:00|94:01:01:82:B8:01:81:64|
10:27:56.854826 Sent 0x|0D:00|2D:00|80|00:00|94:01:01:82:B8:01:81:64|
      
Between the first call of play_tone and the second we see a time distance of half a sec. This compensates the delay. From then on the time distance is one sec. We recognize, that the usage of duration allows exact timing and also can prevent exact timing.

We test class Sleep with a program, that uses tasks for a little lightshow:


#!/usr/bin/env python3

import task, ev3, ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t_green = task.Periodic(2, jukebox.change_color, args=(ev3.LED_GREEN,), num=4)
t_red = task.Periodic(4, jukebox.change_color, args=(ev3.LED_RED,), num=2)
t_orange = task.Periodic(4, jukebox.change_color, args=(ev3.LED_ORANGE,), num=2)

t = task.concat(
    task.Task(t_red.start),
    task.Sleep(1),
    task.Task(t_green.start),
    task.Sleep(1),
    task.Task(t_orange.start),
    task.Task(t_green.join)
)
t.start().join()
print("done")
      
Here we have four tasks, t_green, t_red, t_orange and t. t is the master, that schedules the starting of the three other tasks. When t_orange is started, t joins the task of t_green. We want its thread to be joinable (for timing purpose, the join has to end when the lightshow ends). Its output:

11:26:32.103894 Sent 0x|08:00|2A:00|80|00:00|82:1B:02|
11:26:33.105693 Sent 0x|08:00|2B:00|80|00:00|82:1B:01|
11:26:34.107247 Sent 0x|08:00|2C:00|80|00:00|82:1B:03|
11:26:35.106055 Sent 0x|08:00|2D:00|80|00:00|82:1B:01|
11:26:36.104310 Sent 0x|08:00|2E:00|80|00:00|82:1B:02|
11:26:37.106052 Sent 0x|08:00|2F:00|80|00:00|82:1B:01|
11:26:38.107876 Sent 0x|08:00|30:00|80|00:00|82:1B:03|
11:26:39.106062 Sent 0x|08:00|31:00|80|00:00|82:1B:01|
done
      
It works and we learned, that tasks can start other tasks. We plan to make our tasks stoppable, which says that tasks inside of tasks (contained tasks) need to be stopped too. The conclusion is, we have to change class Task once more.

Contained tasks

We add a class attribute _contained_register to class Task:


    class Task:
        _contained_register = {}
      
Remarks:
  • This will hold all contained tasks with their parent tasks. It's a class attribute that exists once and can be accessed from all instances of class Task. It allows to ask if task1 is cild of (contained in) task2:
    
        if task1 in task.Task._contained_register and task.Task._contained_register[task1] is task2:
            print(task1, "is child of", task2)
       
  • The access is from the child to the parent. We delete an entry, when a task is finished:
    
        self._root._state = STATE_FINISHED
        if self._root in self._contained_register:
            self._contained_register.pop(self._root)
       

We add some code to the constructor:


class Task:
    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            join: bool=False,
            duration: float=None
    ):
        ...
        assert isinstance (join, bool), 'join needs to be a bool value'
        assert not join or hasattr(action, '__self__'), 'only bounded methods can be joined'
        assert not join or isinstance(action.__self__, Task), 'only instances of Task can be joined'
        assert not join or action.__name__ == "start", 'only method start can be joined'
        ...
        self._join = join
        ...
        # the following are root only attributes
        ...
        self._contained = []
      
The meanings of the new attributes:
  • _join: A flag, that signals if the contained task will be joined when starting it. This says the starting task will wait until the started task is finished.
  • _contained: Holds the tasks, which where started as contained (cild) tasks and may still run. In case of stopping the task, these have to be stopped too. This is the opposite direction than _contained_register: from the parent to the child. Its a list because a task may have multiple contained tasks.

Methods _wrapper1 and _wrapper2 also need to be changed:


    def _wrapper1(self) -> None:
        if hasattr(self._action, '__self__') and \
           isinstance(self._action.__self__, Task) and \
           self._action.__name__ in ["start", "join"]:
            task = self._action.__self__
            name = self._action.__name__
            if name == "start":
                if not task in self._root._contained:
                    self._root._contained.append(task)
                self._contained_register.update({task: self._root})
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ == "start" or \
           self._action.__name__ == "start" and self._join:
            self._root._activity = ACTIVITY_BUSY
            self._root._lock.release()

    def _wrapper2(self) -> None:
        if self._join:
            self._action.__self__._thread.join()
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ == "start" or \
           self._action.__name__ == "start" and self._join:
            self._root._lock.acquire()
            self._root._activity = ACTIVITY_NONE
        if hasattr(self._action, '__self__') and \
           isinstance(self._action.__self__, Task) and \
           self._action.__name__ in ["start", "join"]:
            task = self._action.__self__
            state = task.state
            if state == STATE_FINISHED and \
               task in self._root._contained:
                self._root._contained.remove(task)
      
This adds all started tasks to the root tasks list _contained and guaranties that the root task knows all contained tasks which run parallel. A few remarks:
  • Attribute __self__ of a method (bounded callable) holds the object, the method belongs to.
  • Attribute __name__ of a callable holds its name.
  • These two attributes help to identify contained tasks (_action calls method start of a Task instance):
    
            if hasattr(self._action, '__self__') and \
               isinstance(self._action.__self__, Task) and \
               self._action.__name__ == "start":
       
  • Starting a task is not time consuming, therefore we do not release the lock. Only if we join the started task.

Method _final needs to be changed:


    def _final(self) -> None:
        self._root._contained = self._join_contained()
        self._root._state = STATE_FINISHED
        if self._root in self._contained_register:
            self._contained_register.pop(self._root)
        self._root._time_action = None
        self._root._lock.release()
      
with a new method:

    def _join_contained(self) -> list:
        contained = self._root._contained
        self._root._activity = ACTIVITY_JOIN
        self._root._lock.release()
        not_finished = []
        for task in contained:
            if not task in self._contained_register or \
               not self._contained_register[task] is self._root:
                continue
            task.join()
            if task.state != STATE_FINISHED:
                not_finished.append(task)
        self._root._lock.acquire()
        self._root._activity = ACTIVITY_NONE
        return not_finished
      

The call of method _join_contained guaranties, that a task never is finished, before all its contained tasks are finished. We add a new activity:


ACTIVITY_NONE = 'NONE'
ACTIVITY_BUSY = 'BUSY'
ACTIVITY_SLEEP = 'SLEEP'
ACTIVITY_JOIN = 'JOIN'
      

We change property time_action_no_lock:


    @property
    def time_action_no_lock(self) -> float:
        min = None
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ in ["start", "join"]:
            min = self._time_action
        for task in self._contained:
            act = task.time_action
            if min is None:
                min = act
            elif act != None and act < min:
                min = act
        return min
      
The earliest time of any action is returned, may it be of the tasks own action or one of its direct or indirect contained tasks (recursion). Starting or joining a task is no action, we ignore them. What counts is the next action of the contained task, which is considered by recursion.

I think, it is not a good idea, when instances of Repeated or Periodic start unjoined tasks. Therefore we don't add a parameter join to their constructors. This says, they can contain tasks but these are always joined tasks. The constructor of Repeated:


    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0,
            duration: float=0,
            netto_time: bool=False
    ):
        if hasattr(action, '__self__') and \
           isinstance(action.__self__, Task) and \
           action.__name__ == "start":
            super().__init__(action, args, kwargs, join=True, duration=duration)
        else:
            super().__init__(action, args, kwargs, duration=duration)
        ...
      

The constructor of Periodic:


    def __init__(
            self,
            intervall: float,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0,
            duration: float=0,
            netto_time: bool=False
    ):
        if hasattr(action, '__self__') and \
           isinstance(action.__self__, Task) and \
           action.__name__ == "start":
            super().__init__(action, args, kwargs, join=True, duration=duration)
        else:
            super().__init__(action, args, kwargs, duration=duration)
        ...
      

Testing contained classes

We change our lightshow-program:


#!/usr/bin/env python3

import task, ev3, ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t_green = task.Periodic(2, jukebox.change_color, args=(ev3.LED_GREEN,), num=4)
t_red = task.Periodic(4, jukebox.change_color, args=(ev3.LED_RED,), num=2)
t_orange = task.Periodic(4, jukebox.change_color, args=(ev3.LED_ORANGE,), num=2)

task.concat(
    task.Task(t_red.start),
    task.Sleep(1),
    task.Task(t_green.start),
    task.Sleep(1),
    task.Task(t_orange.start)
).start().join()
print("done")
      
There is no need for joining t_green any more, this is done inside. This programs output:

12:39:08.368138 Sent 0x|08:00|2A:00|80|00:00|82:1B:02|
12:39:09.370120 Sent 0x|08:00|2B:00|80|00:00|82:1B:01|
12:39:10.373537 Sent 0x|08:00|2C:00|80|00:00|82:1B:03|
12:39:11.371645 Sent 0x|08:00|2D:00|80|00:00|82:1B:01|
12:39:12.368623 Sent 0x|08:00|2E:00|80|00:00|82:1B:02|
12:39:13.370486 Sent 0x|08:00|2F:00|80|00:00|82:1B:01|
12:39:14.374323 Sent 0x|08:00|30:00|80|00:00|82:1B:03|
12:39:15.370430 Sent 0x|08:00|31:00|80|00:00|82:1B:01|
done
      

Modifying the task factory

We add another method to class Jukebox, which also is a task factory.

Method sound

Method sound plays a sound file. It's API:


     |  sound(self, path:str, duration:float=None, repeat:bool=False) -> task.Task
     |      returns a Task object, that plays a sound file
     |      
     |      Attributes:
     |      path: name of the sound file (without extension ".rsf")
     |      
     |      Keyword Attributes:
     |      duration: duration of the sound file (in sec.)
     |      repeat: flag, if repeatedly playing
      

The code:


    def sound(self, path: str, duration: float=None, repeat: bool=False) -> task.Task:
        if repeat:
            ops = b''.join([
                ev3.opSound,
                ev3.REPEAT,
                ev3.LCX(self._volume), # VOLUME
                ev3.LCS(path)          # NAME
            ])
        else:
            ops = b''.join([
                ev3.opSound,
                ev3.PLAY,
                ev3.LCX(self._volume), # VOLUME
                ev3.LCS(path)          # NAME
            ])
        if not duration:
            return task.Task(
                self.send_direct_cmd,
                args=(ops,)
            )
        elif not repeat:
            return task.Task(
                self.send_direct_cmd,
                args=(ops,),
                duration=duration
            )
        else:
            return task.concat(
                task.Task(
                    self.send_direct_cmd,
                    args=(ops,),
                    duration=duration
                ),
                task.Task(self.stop)
            )
      

We test it with this program:


#!/usr/bin/env python3

import ev3, ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', duration=5, repeat=True).start()
      
It's output:

12:43:28.528101 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
12:43:33.528569 Sent 0x|07:00|2B:00|80|00:00|94:00|
      

A second test where we wrap a Repeated around the Task:


#!/usr/bin/env python3

import ev3, ev3_sound, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = task.Repeated(
    jukebox.sound('./ui/DownloadSucces', duration=2).start,
    num=3
).start()
      
The output:

12:45:46.594595 Sent 0x|1D:00|2A:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
12:45:48.612966 Sent 0x|1D:00|2B:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
12:45:50.614405 Sent 0x|1D:00|2C:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
      

Method song

Now we add the lightshow to method song. First we add an attribute _pos_led to the constructor of class Jukebox:


class Jukebox(ev3.EV3):
    def __init__(self, protocol: str=None, host: str=None, ev3_obj=None):
        super().__init__(protocol=protocol, host=host, ev3_obj=ev3_obj)
        self._volume = 1
        self._temperament = 440
        self._pos_tone = None
        self._pos_led = None
        self._plays = False
      
Then we add the methods _init_color and _next_color to class Jukebox:

    def _init_color(self) -> None:
        self._pos_led = 0

    def _next_color(self, song) -> bool:
        if not self._plays:
            return True
        self.change_color(song["led_sequence"][self._pos_led])
        self._pos_led += 1
        self._pos_led %= len(song["led_sequence"])
      
We modify method stop:

    def stop(self) -> None:
        self.send_direct_cmd(ev3.opSound + ev3.BREAK)
        if self._plays:
            self._plays = False
            self.change_color(ev3.LED_GREEN)
      
We modify method song:

    def song(self, song: dict) -> task.Task:
        tones = task.concat(
            task.Task(self._init_tone),
            task.Repeated(
                self._next_tone,
                args=(song,)
            ),
            task.Task(self.stop)
        )
        colors = task.Periodic(
            60 * song["beats_per_bar"] / song["tempo"],
            self._next_color,
            args=(song,)
        )
        if "upbeat" in song:
            colors = task.concat(
                task.Sleep(60 * song["upbeat"] / song["tempo"]),
                colors
            )
        colors = task.concat(
            task.Task(self._init_color),
            colors
        )
        return task.concat(
            task.Task(tones.start),
            task.Task(colors.start),
            task.Task(tones.join)
        )
      
There is a little trick in this code. Up to now, we can't stop a Task object. Here we need to stop task colors when task tones is finished. This is done by method stop, which sets attribute _plays = False. Attribute _plays signals method _next_color to return value True. This isn't good style and we will simplify the code, when our tasks can be stopped. For the moment we are happy with the solution and we test it:

#!/usr/bin/env python3

import task, ev3, ev3_sound_tmp as ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1
task.concat(
    jukebox.song(ev3_sound.HAPPY_BIRTHDAY),
    task.Sleep(2),
    jukebox.song(ev3_sound.TRIAS)
).start()
      

Last Test

As the last test of this lesson, we pack a Periodic into a Periodic into a Periodic:


#!/usr/bin/env python3

import task, ev3, ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t_inner = task.Periodic(0.25, jukebox.play_tone, args=("c", 0.1), duration=1, num=4)
t_middle = task.Periodic(2, t_inner.start, num=2)
t_outer = task.Periodic(2, t_middle.start, num=2, netto_time=True)
t_outer.start()
      
its output:

12:54:55.427018 Sent 0x|0D:00|2A:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:55.678632 Sent 0x|0D:00|2B:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:55.928161 Sent 0x|0D:00|2C:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:56.177827 Sent 0x|0D:00|2D:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:57.427841 Sent 0x|0D:00|2E:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:57.678454 Sent 0x|0D:00|2F:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:57.928489 Sent 0x|0D:00|30:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:58.178475 Sent 0x|0D:00|31:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:00.430093 Sent 0x|0D:00|32:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:00.680987 Sent 0x|0D:00|33:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:00.930972 Sent 0x|0D:00|34:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:01.181053 Sent 0x|0D:00|35:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:02.430725 Sent 0x|0D:00|36:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:02.681313 Sent 0x|0D:00|37:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:02.931478 Sent 0x|0D:00|38:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:03.181468 Sent 0x|0D:00|39:00|80|00:00|94:01:01:82:83:00:81:64|
      
Great, it works! Maybe you get the feeling, that using tasks will result in perls motto There's more than one way to do it. Yes I think so and I'm a bit concerned about it. But for the moment I see this freedom as a chance.

At the beginning of this lesson, we compared class Task and class Thread. We did not see any real advantage of class Task. Then we developed it further and it became a usefull tool to organize tasks.

Conclusion

We coded a family of task classes with the following relatives:

  • Task: encapsulates a callable, which will be executed once.
    
        class Task(builtins.object)
         |  Uses multithreading for tasks or chains of tasks.
         |  In standard case it's an action, which is executed by a single callable.
         |  Subsequent tasks or chains of tasks can be added with method append().
         |  
         |  Methods defined here:
         |  
         |  __init__(self, action, args:tuple=(), kwargs:dict={}, duration:float=None, join:bool=False)
         |      action: callable object (f.i. a function)
         |      args: argument list of action
         |      kwargs: keyword arguments of action
         |      duration: duration of task (if action returns earlier, task will wait)
         |      join: flag if contained task will be joined
       
  • Repeated: encapsulates a callable that will run multiple times.
    
        class Repeated(Task)
         |  Organizes repeated actions with multithreading (control comes back immediately).
         |  think of task as:
         |      while True:
         |          gap = action(*args, **kwargs)
         |          if gap is False or gap is None:
         |              pass
         |          elif gap is True or gap == -1:
         |              break
         |          else:
         |              time.sleep(gap)
         |  
         |  Methods defined here:
         |  
         |  __init__(self, action, args:tuple=(), kwargs:dict={}, num:int=0, duration:float=0, netto_time:bool=False)
         |      action: callable object, which is repeatedly called (f.i. a function)
         |                If callable it must return a number, a bool or None:
         |                True, -1: end the loop
         |                False, None: next call directly follows (if not reached limit of num)
         |                positive number: time gap between the actual and the next call
         |      args: argument list of action
         |      kwargs: keyword arguments of action
         |      num: number of calls (0 stands for unlimited)
         |      duration: duration of task (if execution ends earlier, task will wait)
         |      netto_time: flag, that waiting is netto (execution of action counts extra)
       
  • Periodic: encapsulates a callable to run it multiple times with a fixed time-intervall.
    
        class Periodic(Task)
         |  Uses multithreading for periodic actions (control comes back immediately).
         |  think of task as:
         |      while not action(*args, **kwargs):
         |          time.sleep(intervall)
         |  
         |  Methods defined here:
         |  
         |  __init__(self, intervall:float, action, args:tuple=(), kwargs:dict={}, num:int=0, duration:float=0, netto_time:bool=False)
         |      intervall: intervall between two calls of action (in seconds)
         |      action: callable object, which is repeatedly called (f.i. a function)
         |          It returns a bool or None:
         |              True: end the loop
         |              False, None: next call will follow (if not reached limit of num)
         |      args: argument list of action
         |      kwargs: keyword arguments of action
         |      num: number of calls (0 stands for unlimited)
         |      duration: duration of task (if execution ends earlier, task will wait)
         |      netto_time: flag, that waiting is netto (execution of action counts extra)
       
  • Sleep: sleeps
    
        class Sleep(Task)
         |  Sleeps
         |  
         |  Methods defined here:
         |  
         |  __init__(self, seconds:float)
         |      seconds: duration of sleeping
       
All of them are instances of class Task and can be combined (in any order) to build chains of tasks. Behind the scenes, they use multithreading, which allows starting parallel tasks (even inside a task one can start parallel tasks). All tasks can be parametrized for exact timing. This helps for a high level of time control. Locking also is done in the background. Only for seldom cases one needs to think about the locking mechanism.

task objects have the following methods:


     |  append(self, task) -> 'Task'
     |      appends a task or a chain of tasks (both must be root tasks)
     |  
     |  join(self) -> None
     |      joins the thread of the task 
     |      think as: my_task.thread.join(), but evaluated when called
     |  
     |  start(self) -> 'Task'
     |      starts execution of task (finished tasks may be started again)
      
Tasks have an easy to handle API and hide the details of multithreading and locking but they are flexible to use. They are independent from the EV3 device and can be used for multiple kinds of software projects, where multithreading is needed.

Tasks are both, architecture and glue. They allow to code callable atoms and then put them together to tasks with complex functionality. The construction of the task from its atoms often needs the thinking of an architect, but all resulting Task objects have the very same simple API. The users of Task objects need no knowledge of its inner structure.

For the moment, tasks can't be stopped and continued and their error handling must be improved. These will be topics of our next lesson. We will extend the methods to:


     |  append(self, task) -> 'Task'
     |      appends a task or a chain of tasks (both must be root tasks)
     |  
     |  cont(self, gap:float=0) -> 'Task'
     |      continues a stopped task (must be a root task)
     |      gap: sets the waiting time before the next action occurs (in seconds)
     |  
     |  join(self) -> None
     |      joins the thread of the task 
     |      think as: my_task.thread.join(), but evaluated when called
     |  
     |  start(self, gap:float=0) -> 'Task'
     |      starts execution of task (finished or stopped tasks may be started again)
     |      gap: sets the waiting time, before start occurs (in seconds)
     |  
     |  stop(self) -> None
     |      stops execution as fast as possible
     |          allows to continue with method cont or restart with method start
     |          already finished tasks silently do nothing
      

Now time has come to play around with the new tools. Modify your programs The depressed giraffe and The dancing robot and use tasks. Be creative and realize some of your own ideas and develop a feeling for the task concept. When you finished your playing, come back to lesson 9.

No comments:

Post a Comment