Friday 27 May 2016

Lesson 9 - Stop and Continue

EV3 Direct commands - Lesson 09

Introduction

We are on the way, to write classes, that encapsulate an unlimited number of (related) actions into tasks objects. All tasks own the same easy handling which is independent of the type of actions. Moving a robot, reading sensor values, playing tones etc., all these procedures can be task objects with the methods: start, stop, cont (continue) and join:


     |  cont(self, gap:float=None) -> 'Task'
     |      continues a stopped task (must be a root task)
     |      
     |      Keyword Arguments:
     |      gap: sets the waiting time before the next action occurs (in seconds)
     |  
     |  join(self) -> None
     |      joins the thread of the task
     |  
     |  start(self, gap:float=0) -> 'Task'
     |      starts execution of task (finished or stopped tasks may be started again)
     |      
     |      Keyword Arguments:
     |      gap: sets the waiting time, before start occurs (in seconds)
     |  
     |  stop(self) -> None
     |      stops execution as fast as possible
     |          allows to continue with method cont or restart with method start
     |          already finished tasks silently do nothing
      
This makes the usage of task objects simple and allows to think about their dependencies instead of fighting with technical details. Task objects use multithreading. This allows to start parallel tasks or even construct tasks, which inside operate parallel actions. The creative part is the construction of task objects and this can be done behind the scenes. The outside world handles task objects, which are ready to use. The subclasses of class EV3 will produce them. This says, at the end, our class TwoWheelVehicle will return tasks, that drive the vehicle. As a consequence the vehicles movements can be stopped and continued and we can run other tasks parallel while a movement takes place.

But yet we are not users of task objects, we have to code them! Usage will be simple, coding will be not. This lesson has three topics:

  • Handling exceptions
  • Stopping tasks and preparing continuation
  • Continuing tasks
Again we use class Jukebox for tests.

Handling Exceptions

One of the open topics from last lesson is the error handling in task objects. We know from lesson 7, that throwing an exception will not influence foreign threads. In programs with multithreading, we need a central place of information about exceptions. If all our threads ask regularly if there has been an exception somewhere, they can react on it. We use a very simple mechanism, where the reaction is an exit.

The central place is a class, what else:


class ExceptionHandler:
    def __init__(self):
        self._exc = False

    def put(self, exc: Exception):
        self._exc = True

    def fire(self):
        if self._exc: sys.exit(1)
      
Its API:

    class ExceptionHandler(builtins.object)
     |  Handles Exceptions of task objects
     |  If anywhere an exceptions occured and was put to the ExceptionHandler,
     |  any thread that uses the same instance of ExceptionHandler exits,
     |  when it calls its method fire
     |  
     |  Methods defined here:
     |  
     |  __init__(self)
     |      Initialize self.  See help(type(self)) for accurate signature.
     |  
     |  fire(self)
     |      fires sys.exit(1) if an exception occured, else does nothing
     |  
     |  put(self, exc:Exception)
     |      informs, that an exception occured
     |      
     |      Arguments:
     |      exc: Exception, ignored, but subclasses may distinguish
      
Method put has a parameter exc, which is never used. This is for future subclasses, which may distinguish between different types of exceptions.

How do we use this class? Let's look at an example:


    def start(self) -> None:
        self._root._exc.fire()
        self._root._lock.acquire()
        try:
            assert self._root is self, 'only root tasks can be started'
            assert self._state in [
                STATE_INIT,
                STATE_STOPPED,
                STATE_FINISHED
            ], "can't start from state " + self._state
        except Exception as exc:
            self._root._exc.put(exc)
            self._root._lock.release()
            raise
        ...
      
Some remarks:
  • Attribute _exc holds the ExceptionHandler of the task.
  • method fire is called in an unlocked state. If the task would be locked, when the thread exits, this blocked other threads.
  • Raising the exception also occurs in an unlocked state.
  • The first thing, method start does, is asking if somewhere an exception occured. If so, its thread exits.
  • If one of the assertions throws an AssertionException, this is put to the tasks ExceptionHandler, then the exception is raised.

We formulate some rules:

  • Before locking (acquire the Lock object), always call method fire of the tasks ExceptionHandler.
  • Before raising an exception, it must be put to the tasks ExceptionHandler.
  • All raising of exceptions must occur in an unlocked state.
  • Use one ExceptionHandler for all your tasks. If not, this needs strong arguments.

As a consequence we add a class attribute to class Task:


class Task:
    _exc_default = ExceptionHandler()
      
This will be the default of all instances of class Task

We add some code to the constructor of Task objects:


    def __init__(self, action: typing.Callable, **kwargs):
        self._action = action
        self._args = kwargs.pop('args', ())
        self._kwargs = kwargs.pop('kwargs', {})
        self._join = kwargs.pop('join', False)
        self._duration = kwargs.pop('duration', None)
        self._num = kwargs.pop('num', 0)
        self._next = None
        self._root = self
        self._time_end = None
        self._netto_time = False
        self._cnt = 0
        # the following are root only attributes
        self._state = STATE_INIT
        self._thread = None
        self._lock = threading.Lock()
        self._cond = threading.Condition(self._lock)
        self._last = None
        self._activity = ACTIVITY_NONE
        self._time_action = None
        self._contained = []
        self._exc = kwargs.pop('exc', self._exc_default)
        self._exc.fire()
        assert not kwargs, 'unknown keyword arguments: ' + str(kwargs.keys())
      
The number of keyword arguments has grown to a limit, where I prefer to use a keyworded variable length argument list **kwargs.

The subclasses Periodic and Repeated also use **kwargs. F.i. class Periodic:


class Periodic(Task):
    def __init__(self, intervall: float, action: typing.Callable, **kwargs):
        self._intervall = intervall
        self._netto_time = kwargs.pop('netto_time', False)
        assert not kwargs['join'], "no keyword argument 'join' for instances of class Periodic"
        if hasattr(action, '__self__') and \
           isinstance(action.__self__, Task) and \
           action.__name__ == "start":
            kwargs.update({'join': True})
        else:
            kwargs.update({'join': False})
        super().__init__(action, **kwargs)
        assert isinstance(self._intervall, numbers.Number), 'intervall must be a number' + intervall
        assert self._intervall >= 0, 'intervall must be positive'
        assert isinstance(self._netto_time, bool), 'netto_time must be a bool value'
      

Class Sleep:


class Sleep(Task):
    def __init__(self, seconds: float, exc: ExceptionHandler=None):
        if not exc:
            exc = self._exc_default
        super().__init__(self._do_nothing, duration=seconds, exc=exc)
      

Stopping Tasks

We code more logic than needed for stopping. We also prepare continuation because we don't like to go twice trough the code.

States

We already know some states from the last lesson:


STATE_INIT = 'INIT'
STATE_STARTED = 'STARTED'
STATE_FINISHED = 'FINISHED'
      
Let's talk about their meaning:
  • INIT: The initial state of a task. When a task is constructed, but never started, its state is INIT. After a tasks start, it will never return to state INIT.
  • STARTED: The call of method start changes the tasks state from INIT or FINISHED to state STARTED. While the whole regular execution of the task, it stays in state STARTED. When the tasks last action is finished, the state changes from STARTED to FINISHED.
  • FINISHED: The final state of a task. When a task finished regularly and was not stopped, its state is FINISHED. A task in state FINISHED can be started again.
We extend these states:

STATE_INIT = 'INIT'
STATE_TO_START = 'TO_START'
STATE_STARTED = 'STARTED'
STATE_TO_STOP = 'TO_STOP'
STATE_STOPPED = 'STOPPED'
STATE_TO_CONTINUE = 'TO_CONTINUE'
STATE_FINISHED = 'FINISHED'
      
The meaning of the additional states:
  • TO_START: We set it if method start was called with an argument gap, which schedules the starting for the future. In the meantime the state is TO_START. State TO_START signals, there is no execution or sleeping in progress.
  • TO_STOP: Method stop already was called (this changed the state STARTEDTO_STOP) and the task or its contained tasks have not yet ended their last action. If a call of start follows, while the old execution still is in progress, the series of states is: STARTEDTO_STOPTO_STARTSTARTED.
  • STOPPED: The task ended an action and red the state TO_STOP. This prevents the execution of the next action or sleeping and the state changes TO_STOPSTOPPED.
  • TO_CONTINUE: Method cont was called, and it waits to execute the next action. State TO_CONTINUE signals (like INIT, STOPPED, FINISHED or TO_START), there is no execution or sleeping in progress.
While an action is executed, there can be a series of method-calls, f.i. stop, start, stop. Only the first of them results in a change of the state: STARTEDTO_STOP. We need another criterion to identify the situations after the second or third call of a method. This is the existence of the new thread (_thread_start or _thread_cont). The second call of start) creates the new thread _thread_start, the next call of stop prevents the new thread from executing an action.

The combination of the state and the existence of new threads makes the full understanding of the situation:

  • TO_STOP and _thread_start != None: methods stop and start were called while the last action was executed.
  • TO_STOP and _thread_cont != None: methods stop and cont were called while the last action was executed.
  • TO_STOP and _thread_start == None and _thread_cont == None: the last call was method stop and the last action still executes.

Responsibilities for state transitions

We look, which parts of our tasks are responsible for changes of the state (without method cont).

  • Method start and its followers _start2 and _start3:
    • From [INIT, STOPPED, FINISHED] to STARTED: if called with gap == 0.
    • From [INIT, STOPPED, FINISHED] to TO_START: if called with gap > 0.
    • Unchanged state TO_STOP: the old thread still is executing. A new thread is created and started, but the state remains TO_STOP.
    • From TO_START to STARTED: when the old thread ended and gap is over.
  • Method stop:
    • From STARTED to TO_STOP: never changes directly from STARTED to STOPPED.
    • From TO_START to STOPPED: TO_START signals, that _execute was not yet called.
    • From TO_CONTINUE to STOPPED: the old thread came to its end, the new thread did not yet call _execute.
  • Method _execute:
    • From STARTED to FINISHED: This is the regular case.
    • From TO_STOP to FINISHED: The actual action ended and there was no next one and no final sleeping.
    • From TO_STOP to TO_START: The actual action ended and a new thread _thread_start already was started from method start.
    • From TO_STOP to TO_CONTINUE: The actual action ended and a new thread _thread_cont already was started from method cont.
    • From TO_STOP to STOPPED: The actual action ended and there were no new threads.

Additional Attributes

We add attributes to class Task:


class Task:
    _exc_default = ExceptionHandler()
    _contained_register = {}
    
    def __init__(self, action: typing.Callable, **kwargs):
        ...
        # the following are root only attributes
        ...
        self._action_stop = kwargs.pop('action_stop', None)
        self._args_stop = kwargs.pop('args_stop', ())
        self._kwargs_stop = kwargs.pop('kwargs_stop', {})
        self._action_cont = kwargs.pop('action_cont', None)
        self._args_cont = kwargs.pop('args_cont', ())
        self._kwargs_cont = kwargs.pop('kwargs_cont', {})
        self._thread_start = None
        self._thread_cont = None
        self._actual = None
        self._cont_join = None
        self._time_called_stop = None
        self._restart = False
      
Their meaning:
  • _action_stop: The default stopping only prevents the next action from execution. Often it additionally needs a stopping action, f.i. stopping the sound or a movement.
  • _args_stop: The stopping action may have positional arguments.
  • _kwargs_stop: The stopping action may have keyword arguments.
  • _action_cont: like _action_stop, f.i. when _action_stop ends a movement, _action_cont restarts it.
  • _args_cont: positional arguments of _action_cont.
  • _kwargs_cont: keyword arguments of _action_cont.
  • _thread_start: We already discussed it. The old thread _thread may be in state TO_STOP when method start is called. Then attribute _thread_start holds the new thread as long as the old one also is needed.
  • _thread_cont: like thread_start, but for continuation.
  • _actual: holds the actual link in the chain. Continuation needs to know it.
  • _cont_join: If the task was stopped while joining a contained task, this attribute holds the task to join.
  • _time_called_stop: Continuation has to continue at the correct time. It takes value _time_action and adds the time distance between the call of the method stop and the time when it could start executing the next action.
  • _restart: If there was a sequence of method calls, while the state was TO_STOP, f.i. start, stop, cont, the last continuation needs to know, that is has to restart instead of continue.

Method stop

We add method stop:


    def stop(self) -> None:
        self._root._exc.fire()
        self._root._lock.acquire()
        try:
            assert self is self._root, 'only root tasks can be stopped'
            assert self._state in [
                STATE_TO_START,
                STATE_STARTED,
                STATE_TO_STOP,
                STATE_TO_CONTINUE,
                STATE_FINISHED
            ], "can't stop from state: " + self._state
            assert self._state != STATE_TO_STOP or self._thread_start or self._thread_cont, \
                "stopping is already in progress"
        except Exception as exc:
            self._root._exc.put(exc)
            self._root._lock.release()
            raise
        if self._state == STATE_FINISHED:
            self._lock.release()
            return
        if self._time_called_stop is None:
            self._time_called_stop = time.time()
        if self._activity is ACTIVITY_SLEEP:
            self._cond.notify()
        not_stopped = []
        for task in self._contained:
            if not task in self._contained_register or \
               not self._contained_register[task] is self:
                continue
            task.lock.acquire()
            if task._state in [STATE_STARTED, STATE_TO_START, STATE_TO_CONTINUE]:
                not_stopped.append(task)
            elif task._state == STATE_TO_STOP and (task._thread_start or task._thread_cont):
                not_stopped.append(task)
            task.lock.release()
        for task in not_stopped:
            task.stop()
        if self._state == STATE_STARTED:
            self._state = STATE_TO_STOP
        elif self._thread_start:
            self._thread_start = None
            if self._state == STATE_TO_START:
                self._state = STATE_STOPPED
        else:
            self._thread_cont = None
            if self._state == STATE_TO_CONTINUE:
                self._state = STATE_STOPPED
        self._lock.release()
      
Step by step we go through this code:
  • If the task already is finished, it silently does nothing:
    
            if self._state == STATE_FINISHED:
                self._lock.release()
                return
       
  • The time of the first call of stop is needed for the correct timing in method cont:
    
            if self._time_called_stop is None:
                self._time_called_stop = time.time()
       
  • If the task is sleeping, we interrupt the sleeping:
    
            if self._activity is ACTIVITY_SLEEP:
                self._cond.notify()
       
  • Stopping all contained tasks:
    
            not_stopped = []
            for task in self._contained:
                if not task in self._contained_register or \
                   not self._contained_register[task] is self:
                    continue
                task.lock.acquire()
                if task._state in [STATE_STARTED, STATE_TO_START, STATE_TO_CONTINUE]:
                    not_stopped.append(task)
                elif task._state == STATE_TO_STOP and (task._thread_start or task._thread_cont):
                    not_stopped.append(task)
                task.lock.release()
            for task in not_stopped:
                task.stop()
       
    This works recursive and stops all direct or indirect children tasks.
  • Changing state STARTEDTO_STOP:
    
            if self._state == STATE_STARTED:
                self._state = STATE_TO_STOP
       
  • If there was a call of method start, its new thread looses its reference:
    
            elif self._thread_start:
                self._thread_start = None
       
    This signals method start that the thread has to end before it reaches state STARTED
  • If there was a call of method cont, its new thread looses its reference:
    
            else:
                self._thread_cont = None
                if self._state == STATE_TO_CONTINUE:
                    self._state = STATE_STOPPED
       
    This signals method cont that the thread has to end before it reaches state STARTED

Method start

Method start has to handle the situation, when it finds a task in state TO_STOP and it got a keyword argument gap:


    def start(self, gap: float=0) -> 'Task':
        self._root._exc.fire()
        self._root._lock.acquire()
        try:
            assert isinstance(gap, numbers.Number), 'gap needs to be a number'
            assert gap >= 0, 'gap needs to be positive'
            assert self._root is self, 'only root tasks can be started'
            assert self._state in [
                STATE_INIT,
                STATE_TO_STOP,
                STATE_STOPPED,
                STATE_FINISHED
            ], "can't start from state " + self._state
            assert self._thread_start is None, "starting is already in progress"
            assert self._thread_cont is None, "continuation is already in progress"
        except Exception as exc:
            self._root._exc.put(exc)
            self._root._lock.release()
            raise
        if self._state == STATE_TO_STOP or gap > 0:
            if self._state == STATE_TO_STOP:
                self._restart = True
            else:
                self._state = STATE_TO_START
            if gap:
                self._thread_start = threading.Thread(
                    target=self._start2,
                    args=(time.time() + gap,)
                )
            else:
                self._thread_start = threading.Thread(target=self._start2)
            self._thread_start.start()
        else:
            self._start3()
            self._thread = threading.Thread(target=self._execute)
            self._thread.start()
        return self
      

Method _start2


    def _start2(self, time_action: float=None) -> None:
        if self._state == STATE_TO_STOP:
            self._lock.release()
            self._thread.join()
            self._exc.fire()
            self._lock.acquire()
        if not threading.current_thread() is self._thread_start:
            self._lock.release()
            return
        if time_action:
            gap = time_action - time.time()
            if gap > 0:
                self._activity = ACTIVITY_SLEEP
                self._cond.wait(gap)
                self._activity = ACTIVITY_NONE
                if not threading.current_thread() is self._thread_start:
                    self._lock.release()
                    return
        self._thread = self._thread_start
        self._thread_start = None
        self._start3()
        self._execute()
      
Annotations:
  • First it joins the old thread.
  • Then it tests if there was a call of method stop. If so, it returns without changing the state or executing something.
  • If method start was called with keyword argument gap, it waits until its time has come. Then it tests if meanwhile there was a call of stop.
  • Its thread becomes the thread of the task and it executes its actions.

Method _start3

These are a few lines of code, we need twice (in methods start and _start2):


    def _start3(self) -> None:
        self._state = STATE_STARTED
        self._restart = False
        self._time_called_stop = None
        self._actual = self
        self._cnt = 0
        self._time_action = time.time()
        if self._duration != None:
            self._time_end = self._time_action + self._duration
      

Method join

Joining needs to join all threads, the old and the new ones:


    def join(self) -> None:
        try:
            assert self._root is self, "only root tasks can be joined"
            assert self._state != STATE_INIT, "can't join tasks in state " + str(self._state)
        except Exception as exc:
            self._root._exc.put(exc)
            raise
        self._exc.fire()
        try: self._thread_start.join()
        except Exception: pass
        try: self._thread_cont.join()
        except Exception: pass
        try: self._thread.join()
        except Exception: pass
      

Method _execute

While a tasks action is executed or while it's sleeping, it releases its lock. This allows to execute method stop, which changes the state STARTEDTO_STOP. Method _execute needs to handle state TO_STOP and it needs to react as fast as possible:


    def _execute(self) -> None:
        while True:
            if self._root._state != STATE_STARTED:
                self._final(outstand=True)
                return
            try:
                gap = self._wrapper()
            except Exception as exc:
                self._exc.put(exc)
                raise
            self._cnt += 1
            if gap == -1 or self._num > 0 and self._cnt >= self._num:
                self._root._time_action = time.time()
                break
            if gap == 0:
                self._root._time_action = time.time()
                continue
            if self._netto_time:
                self._root._time_action = time.time() + gap
                real_gap = gap
            else:
                self._root._time_action += gap
                real_gap = self._root._time_action - time.time()
            if real_gap > 0:
                if self._root._state != STATE_STARTED:
                    self._final(outstand=True)
                    return
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(real_gap)
                self._root._activity = ACTIVITY_NONE
        if self._time_end:
            self._root._time_action = self._time_end
            gap = self._root._time_action - time.time()
            if self._root._state == STATE_STARTED and gap > 0:
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(gap)
                self._root._activity = ACTIVITY_NONE
            if self._root._state == STATE_STARTED:
                self._time_end = None
            elif not self is self._root:
                self._root._time_end = self._time_end
                self._time_end = None
        else:
            self._root._time_action = time.time()
        if self._next:
            self._root._actual = self._next
            self._next._cnt = 0
            self._root._time_end = None
            if self._next._duration != None:
                self._next._time_end = self._root._time_action + self._next._duration
            self._next._execute()
        else:
            self._final()
      
The last task may have an argument duration. If we stop it while its last sleeping, the root task will end with attribute _time_end but not _actual. This signals: the last sleeping was not finished.

Method _wrapper1


    def _wrapper1(self) -> None:
        if hasattr(self._action, '__self__') and \
           isinstance(self._action.__self__, Task) and \
           self._action.__name__ in ["start", "cont", "join"]:
            task = self._action.__self__
            name = self._action.__name__
            if (self._join or name is "join"):
                self._root._cont_join = task
            if name in ["start", "cont"]:
                if not task in self._root._contained:
                    self._root._contained.append(task)
                self._contained_register.update({task: self._root})
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ in ["start", "cont"] or \
           self._action.__name__ == "start" and self._join:
            self._root._activity = ACTIVITY_BUSY
            self._root._lock.release()
            self._root._exc.fire()
      
Remarks:
  • Method cont is not time consuming, there is no need for releasing and aquiring the lock.
  • Attribute _cont_join needs to be set if we join a task.
  • If the action is the continuation of a task, we have to actualize the roots attribute _contained and the class attribute _contained_register.

Method wrapper2

The corresponding logic after the execution of an action:


    def _wrapper2(self) -> None:
        if self._join:
            self._action.__self__._thread.join()
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ in ["start", "cont"] or \
           self._action.__name__ == "start" and self._join:
            self._root._exc.fire()
            self._root._lock.acquire()
            self._root._activity = ACTIVITY_NONE
        if hasattr(self._action, '__self__') and \
           isinstance(self._action.__self__, Task) and \
           self._action.__name__ in ["start", "stop", "cont", "join"]:
            task = self._action.__self__
            name = self._action.__name__
            state = task.state
            if self._root._cont_join and \
               (self._root._state == STATE_STARTED or \
                state == STATE_FINISHED):
                self._root._cont_join = None
            if (state == STATE_FINISHED or name == "stop") and \
               task in self._root._contained:
                self._root._contained.remove(task)
            if name == "stop" and \
               task in self._contained_register:
                self._contained_register.pop(task)
      

Method _final


    def _final(self, outstand=False) -> None:
        self._root._contained = self._join_contained()
        if self._root._state == STATE_STARTED:
            self._root._state = STATE_FINISHED
        elif self._root._state == STATE_TO_STOP:
            if not self._next and \
               not self._root._contained and \
               not self._root._time_end and \
               not outstand:
                self._root._state = STATE_FINISHED
            elif self._root._action_stop:
                self._root._action_stop(
                    *self._root._args_stop,
                    **self._root._kwargs_stop
                )
        if self._root._state == STATE_FINISHED:
            if self._root in self._contained_register:
                self._contained_register.pop(self._root)
            self._root._thread_cont = None
            self._root._actual = None
            self._root._time_action = None
        else:
            if not self._next and not outstand:
                self._root._actual = None
                self._root._time_action = None
            if self._root._thread_start:
                self._root._actual = None
                self._root._time_action = None
                self._root._state = STATE_TO_START
            elif self._root._thread_cont:
                self._root._state = STATE_TO_CONTINUE
            else:
                self._root._state = STATE_STOPPED
        if self._root._time_action and self._root._time_action < time.time():
            self._root._time_action = None
        self._root._lock.release()
      
Remarks:
  • There is a lot of logic for changing the state.
  • The stopping action may be called.
  • The new attributes need to be set.
These were all modifications of the code.

Class Jukebox

In the lessons 7 and 8 we used class Jukebox to demonstrate multithreading and the chances of the task concept. Now we modify method sound and prepare it to stop appropriate then we do the same with method song.

Modifying method sound

We change method sound:


    def sound(self, path: str, duration: float=None, repeat: bool=False) -> task.Task:
        if repeat:
            ops = b''.join([
                ev3.opSound,
                ev3.REPEAT,
                ev3.LCX(self._volume), # VOLUME
                ev3.LCS(path)          # NAME
            ])
        else:
            ops = b''.join([
                ev3.opSound,
                ev3.PLAY,
                ev3.LCX(self._volume), # VOLUME
                ev3.LCS(path)          # NAME
            ])
        if not repeat and not duration:
            return task.Task(
                self.send_direct_cmd,
                args=(ops,)
            )
        elif not repeat and duration:
            t_inner = task.Task(
                self.send_direct_cmd,
                args=(ops,),
                duration=duration,
                action_stop=self.stop
            )
            return task.Task(t_inner.start, join=True)
        elif repeat and not duration:
            t_inner = task.Task(
                self.send_direct_cmd,
                args=(ops,),
                duration=9999999,
                action_stop=self.stop,
                action_cont=self.send_direct_cmd,
                args_cont=(ops,)
            )
            return task.Task(t_inner.start, join=True)
        elif repeat and duration:
            t_inner = task.Task(
                self.send_direct_cmd,
                args=(ops,),
                duration=duration,
                action_stop=self.stop,
                action_cont=self.send_direct_cmd,
                args_cont=(ops,)
            )
            return task.Task(t_inner.start, join=True)
      
There is no direct command to continue a sound file. If you compare with our first version from lesson 8:
  • not repeat and not duration: this is unchanged, we implement no stopping because the task is not time consuming. This type of calling method sound should be used for short sound signals.
  • not repeat and duration: it stops the sound, but in case of continuation it will wait silently. Restarting does not fit the intended timing.
  • repeat and not duration: a task with endless duration has actions for stopping and continuation. It needs a final stop to end the playing of the sound file.
  • repeat and duration: this is straight foreward but later we will see, that it is not perfect.

Tests of method sound

not repeat and not duration

We stop the task directly after its start, this will change the state STARTEDTO_STOP. The task will wait until the action is finished, then change the state TO_STOPFINISHED. We run the following program:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces')
print("state:", t.state)
t.start()
print("state:", t.state)
t.stop()
print("state:", t.state)
time_stop = time.time()
t.join()
time_join = time.time()
print("state: {}, duration of stopping: {}, time_action: {}".format(
    t.state,
    time_join - time_stop,
    t.time_action
))
      
and get this output:

state: INIT
07:32:35.683207 Sent 0x|1D:00|2A:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
state: STARTED
state: TO_STOP
state: FINISHED, duration of stopping: 0.0002593994140625, time_action: None
      
Indeed, the call of method stop returned immediately and set the state to TO_STOP. A very short time of less than 1 thousandth sec. later, the task was finished. The sequence of states was INITSTARTEDTO_STOPFINISHED.

not repeat and duration

This uses a contained task. We print data from both tasks. Of special interest is attribute _time_end of the inner task. It holds the rest of the original duration.


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', duration=1)
print("state:", t.state)
t.start()
print("state: {}, state of contained task: {}".format(t.state, t._cont_join.state))
t.stop()
print("state: {}, state of contained task: {}".format(t.state, t._cont_join.state))
time_stop = time.time()
t.join()
time_join = time.time()
print("duration of stopping: {}, state: {}, time_action: {}".format(
    time_join - time_stop,
    t.state,
    t.time_action
))
print("Contained task:")
print("state: {}, actual: {}, time_end: {}".format(
    t._cont_join.state,
    t._cont_join._actual,
    t._cont_join._time_end - time.time()
))
      
The output:

state: INIT
07:55:56.404427 Sent 0x|1D:00|2A:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
state: STARTED, state of contained task: STARTED
state: TO_STOP, state of contained task: TO_STOP
07:55:56.405752 Sent 0x|07:00|2B:00|80|00:00|94:00|
duration of stopping: 0.0015096664428710938, state: STOPPED, time_action: None
Contained task:
state: STOPPED, actual: None, time_end: 0.9963183403015137
      
The program sent two direct commands. The first started the playing, the second stopped it. The stopping needed a bit more time, it had to stop the contained task too. Both tasks ended in state STOPPED and _time_end holds the rest of the original duration. When calling method cont, the task will silently wait because there is no actual action (attribute actual is None) and no action_cont.

repeat and not duration

We start an unlimited repeated playing of a sound file and stop it after three seconds:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', repeat=True)
t.start()
time.sleep(3)
t.stop()
t.join()
print("Contained task:")
print("state: {}, actual: {}, time_end: {}".format(
    t._cont_join.state,
    t._cont_join._actual,
    t._cont_join._time_end - time.time()
))
print(t._cont_join._action_cont)
      
The output:

08:31:22.900092 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
08:31:25.903642 Sent 0x|07:00|2B:00|80|00:00|94:00|
Contained task:
state: STOPPED, actual: None, time_end: 999999995.9944854
<bound method EV3.send_direct_cmd of <ev3_sound.Jukebox object at 0x7f61d5be19b0>>
      
The rest of the duration is very long now and the inner task it will not silently continue because attribute _action_cont is set.

repeat and duration

We don't test the stopping but we come back to it, when we test the continuation.


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', repeat=True, duration=3)
t.start().join()
print("state:", t.state)
      
The output:

08:41:59.082772 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
08:42:02.083546 Sent 0x|07:00|2B:00|80|00:00|94:00|
state: FINISHED
      
It seems to work.

Modifying method song

Method song returns a Task object with two contained tasks, one for colors, one for tones. We managed to stop the colors, when the sequence of tones ends, but we didn't like our solution. The new method stop allows to make colors and tones independent and group the colors around the tones:


task.concat(
    task.Task(colors.start),
    task.Task(tones.start, join=True),
    task.Task(colors.stop)
)
      

Our modifications:

  • We remove method play_song. Calling jukebox.song(ev3_sound.HAPPY_BIRTHDAY).start() does the job as well as jukebox.play_song(ev3_sound.HAPPY_BIRTHDAY).
  • We remove attribute _plays.
  • We simplify method stop:
    
        def stop(self) -> None:
            self.send_direct_cmd(ev3.opSound + ev3.BREAK)
       
  • We modify the task factory song:
    
        def song(self, song: dict) -> task.Task:
            tones = task.concat(
                task.Task(
                    self._init_tone,
                    action_stop=self.stop
                ),
                task.Repeated(
                    self._next_tone,
                    args=(song,)
                ),
                task.Task(self.stop)
            )
            colors = task.Periodic(
                60 * song["beats_per_bar"] / song["tempo"],
                self._next_color,
                args=(song,)
            )
            if "upbeat" in song:
                colors = task.concat(
                    task.Sleep(60 * song["upbeat"] / song["tempo"]),
                    colors
                )
            colors = task.concat(
                task.Task(
                    self._init_color,
                    action_stop=self.change_color,
                    args_stop=(ev3.LED_GREEN,)
                ),
                colors
            )
            return task.concat(
                task.Task(colors.start),
                task.Task(tones.start, join=True),
                task.Task(colors.stop)
            )
       
All four members of the task family, Task, Repeated, Periodic and Sleep are in use. Method song returns a Task object which can be combined with other task objects.

Testing method song

We test it with this program:


#!/usr/bin/env python3

import ev3, ev3_sound, task, time

jukebox = ev3_sound.Jukebox(
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
t = task.concat(
    jukebox.song(ev3_sound.HAPPY_BIRTHDAY),
    task.Sleep(1),
    jukebox.song(ev3_sound.TRIAS)
)
t.start()
time.sleep(9)
t.stop()
t.start(2)
      
It plays Happy Birthday for 9 sec., then it stops for 2 sec. (the color changes to green), then ist starts again and plays Happy Birthday, which is followed by the trias. The handling of contained task seems to be correct!

Tasks with long-lasting actions

Stopping

We code an action that lasts two sec. The stopping is called, when the action was executed since one sec. and needs another sec. to finish:


#!/usr/bin/env python3

import task, time

def first_action():
    print("first action begin")
    time.sleep(2)
    print("first action end")

t = task.concat(
    task.Task(first_action),
    task.Task(print, args=("second action",))
)
t.start()
print("task started, state:", t.state)
time.sleep(1)
t.stop()
time_stop = time.time()
print("task stopped, state:", t.state)
t.join()
time_join = time.time()
print("duration of joining:", time_join - time_stop)
print("task joined, state:", t.state)
print("time_action:", t.time_action)
      
We expect that the state TO_STOP lasts one sec. because the stopping algorithm has to wait until the actual action is finished. Then the state will change TO_STOPSTOPPED. The output:

first action begin
task started, state: STARTED
task stopped, state: TO_STOP
first action end
duration of joining: 1.0017707347869873
task joined, state: STOPPED
time_action: None
      
Indeed, the stopping needed about one sec. The value None of property time_action says: no limitation, the next action can be started as fast as possible.

Restarting

We call a series of methods start and stop, while the stopping is in execution:


#!/usr/bin/env python3

import task, time

def first_action():
    print("first action begin")
    time.sleep(2)
    print("first action end")

t = task.concat(
    task.Task(first_action),
    task.Task(print, args=("second action",))
)
t.start()
time.sleep(1)
t.stop()
print("task stopped, state:", t.state)
t.start()
print("task started, state:", t.state, " thread_start:", t._thread_start)
t.stop()
print("task stopped, state:", t.state, " thread_start:", t._thread_start)
      
The output:

first action begin
task stopped, state: TO_STOP
task started, state: TO_STOP  thread_start: <Thread(Thread-2, started 139977212032768)>
task stopped, state: TO_STOP  thread_start: None
first action end
      
The first call of method stop changed the state STARTEDTO_STOP. From then on it needs one sec. until the action ends. In this time the program calls another start and another stop. Method start creates and starts thread _thread_start, but it does not execute anything because its reference disappeares while it is waiting for the end of the old thread.

Continue a stopped task

We prepared continuation when we realized method stop. Now we add method cont.

Method cont

Like method start, cont starts a thread and returns immediately:


    def cont(self, gap: float=None) -> 'Task':
        self._exc.fire()
        self._lock.acquire()
        try:
            assert self is self._root, 'only root tasks can be continued'
            assert gap is None or isinstance(gap, numbers.Number), 'gap needs to be a number'
            assert gap is None or gap >= 0, 'gap needs to be positive'
            assert self._state in [
                STATE_STOPPED,
                STATE_TO_STOP,
                STATE_FINISHED
            ], "can't continue from state: {} (task: {})".format(
                self._state,
                self
            )
            assert self._thread_start is None, "starting is already in progress"
            assert self._thread_cont is None, "continuation is already in progress"
        except Exception as exc:
            self._root._exc.put(exc)
            self._root._lock.release()
            raise
        if self._state == STATE_FINISHED:
            self._lock.release()
            return self
        if gap is None:
            self._thread_cont = threading.Thread(target=self._cont2)
        else:
            self._thread_cont = threading.Thread(
                target=self._cont2,
                kwargs={"time_cont": time.time() + gap}
            )
        self._thread_cont.start()
        return self
      
Annotations:
  • If the task already is finished, it does nothing and returns silently.
  • After starting the new thread, it returns without releasing the lock.
  • Unlike method start its thread never calls method _execute. Continuation has to handle contained tasks and only method _cont2 knows how to do that.

Method _cont2

Method _cont2 runs in the thread, that was started from method cont. The state of the task is either STOPPED or TO_STOP.


    def _cont2(self, time_cont: float=None, time_delta: float=None) -> None:
        if self._state == STATE_STOPPED:
            self._state = STATE_TO_CONTINUE
        elif self._state == STATE_TO_STOP:
            self._lock.release()
            self._thread.join()
            self._exc.fire()
            self._lock.acquire()
        if not threading.current_thread() is self._thread_cont:
            self._lock.release()
            return
        if time_cont:
            gap = time_cont - time.time()
            if gap > 0:
                self._activity = ACTIVITY_SLEEP
                self._cond.wait(gap)
                self._activity = ACTIVITY_NONE
                if not threading.current_thread() is self._thread_cont:
                    self._lock.release()
                    return
        if self._restart:
            self._restart = False
            self._actual = self
            self._contained = []
            self._time_action = time.time()
            if self._duration:
                self._time_end = self._time_action + self._duration
            else:
                self._time_end = None
        else:
            if self._action_cont:
                self._action_cont(*self._args_cont, **self._kwargs_cont)
            if not time_cont and not time_delta:
                time_delta = time.time() - self._time_called_stop
            elif not time_delta:
                next_time_action = self.time_action_no_lock
                if next_time_action:
                    time_delta = time.time() - next_time_action
                elif self._time_end:
                    time_delta = time.time() - self._time_called_stop
                else:
                    time_delta = -1
            if self._actual:
                if self._time_action:
                    self._time_action += time_delta
                if self._actual._time_end:
                    self._actual._time_end += time_delta
            elif self._time_end:
                self._time_end += time_delta
        self._state = STATE_STARTED
        self._time_called_stop = None
        self._thread = self._thread_cont
        self._thread_cont = None
        if self._contained:
            for task in self._contained:
                if task._state is STATE_FINISHED:
                    continue
                if not task in self._contained_register or \
                   self._contained_register[task] != self:
                    continue
                task._lock.acquire()
                task._thread_cont = threading.Thread(
                    target=task._cont2,
                    kwargs={'time_cont': time_cont, 'time_delta': time_delta}
                )
                task._thread_cont.start()
            if self._cont_join:
                self._activity = ACTIVITY_JOIN
                self._lock.release()
                self._cont_join.join()
                self._exc.fire()
                self._lock.acquire()
                self._activity = ACTIVITY_NONE
                if self._state != STATE_STARTED:
                    self._final()
                    return
        if self._actual:
            if self._time_action:
                gap = self._time_action - time.time()
                if gap > 0:
                    self._activity = ACTIVITY_SLEEP
                    self._cond.wait(gap)
                    self._activity = ACTIVITY_NONE
                    if self._state != STATE_STARTED:
                        self._final()
                        return
            self._actual._execute()
        else:
            if self._time_end:
                gap = self._time_end  - time.time()
                if gap > 0:
                    self._activity = ACTIVITY_SLEEP
                    self._cond.wait(gap)
                    self._activity = ACTIVITY_NONE
                    if self._state != STATE_STARTED:
                        self._final()
                        return
            self._time_end = None
            self._final()
      
Step by step we go through this code:
  • If the tasks old thread still is executing an action, we wait until it ends.
    
            if self._state == STATE_STOPPED:
                self._state = STATE_TO_CONTINUE
            elif self._state == STATE_TO_STOP:
                self._lock.release()
                self._thread.join()
                self._exc.fire()
                self._lock.acquire()
       
    Joining is time consuming, meanwhile the lock is released.
  • While joining, there could be a call of method stop. If so, it sets _thread_cont = None and another call of cont would creat a new _thread_cont. We control both:
    
            if not threading.current_thread() is self._thread_cont:
                self._lock.release()
                return
       
  • If the method was called with argument gap. We wait this time:
    
            if time_cont:
                gap = time_cont - time.time()
                if gap > 0:
                    self._activity = ACTIVITY_SLEEP
                    self._cond.wait(gap)
                    self._activity = ACTIVITY_NONE
                    if not threading.current_thread() is self._thread_cont:
                        self._lock.release()
                        return
       
  • If there was a call of method start between the first stop and the actual cont, we have to continue with the first link in the chain (the root task) and ignore all contained tasks.
    
            if self._restart:
                self._restart = False
                self._actual = self
                self._contained = []
                self._time_action = time.time()
                if self._duration:
                    self._time_end = self._time_action + self._duration
                else:
                    self._time_end = None
       
  • When there is a special action for continuation, it is called:
    
            else:
                if self._action_cont:
                    self._action_cont(*self._args_cont, **self._kwargs_cont)
       
  • We have to correct all timing by a time shift. This may be the time between the calls of stop and cont or argument gap made it.
    
                if not time_cont and not time_delta:
                    time_delta = time.time() - self._time_called_stop
                elif not time_delta:
                    next_time_action = self.time_action_no_lock
                    if next_time_action:
                        time_delta = time.time() - next_time_action
                    elif self._time_end:
                        time_delta = time.time() - self._time_called_stop
                    else:
                        time_delta = -1
       
    If the task is a contained task, time_delta came in as an argument. Value -1 says, it's not needed, but set. This prevents the contained tasks from repeating the calculation.
  • The correction of the timing:
    
                if self._actual:
                    if self._time_action:
                        self._time_action += time_delta
                    if self._actual._time_end:
                        self._actual._time_end += time_delta
                elif self._time_end:
                    self._time_end += time_delta
       
    Recursion will do the same shift in all contained tasks so that the original synchronisation still will be conserved.
  • The state changes to STARTED and the continuation thread becomes the thread of the task:
    
            self._state = STATE_STARTED
            self._time_called_stop = None
            self._thread = self._thread_cont
            self._thread_cont = None
       
  • All contained tasks have to continue. Their synchronization must be reconstructed. This is done by calling their method _cont2 with keyword argument time_delta:
    
            if self._contained:
                for task in self._contained:
                    if task._state is STATE_FINISHED:
                        continue
                    if not task in self._contained_register or \
                       self._contained_register[task] != self:
                        continue
                    task._lock.acquire()
                    task._thread_cont = threading.Thread(
                        target=task._cont2,
                        kwargs={'time_cont': time_cont, 'time_delta': time_delta}
                    )
                    task._thread_cont.start()
       
  • The task may have been stopped, while joining a contained task. If so, the joining must occur before the tasks next action is executed.
    
                if self._cont_join:
                    self._activity = ACTIVITY_JOIN
                    self._lock.release()
                    self._cont_join.join()
                    self._exc.fire()
                    self._lock.acquire()
                    self._activity = ACTIVITY_NONE
                    if self._state != STATE_STARTED:
                        self._final()
                        return
       
  • The execution of the next action may need another sleeping. If there is no more action, the task has to join its contained tasks. This is done by method _final.
    
            if self._actual:
                if self._time_action:
                    gap = self._time_action - time.time()
                    if gap > 0:
                        self._activity = ACTIVITY_SLEEP
                        self._cond.wait(gap)
                        self._activity = ACTIVITY_NONE
                        if self._state != STATE_STARTED:
                            self._final()
                            return
                self._actual._execute()
            else:
                if self._time_end:
                    gap = self._time_end  - time.time()
                    if gap > 0:
                        self._activity = ACTIVITY_SLEEP
                        self._cond.wait(gap)
                        self._activity = ACTIVITY_NONE
                        if self._state != STATE_STARTED:
                            self._final()
                            return
                self._time_end = None
                self._final()
       

Continuing tasks with long-lasting actions

We test the continuation with this program:


#!/usr/bin/env python3

import task, time, datetime

def action(txt):
    now = datetime.datetime.now().strftime('%H:%M:%S.%f')
    print(now, txt, "begin")
    time.sleep(2)
    now = datetime.datetime.now().strftime('%H:%M:%S.%f')
    print(now, txt, "end")

t = task.concat(
    task.Task(action, args=("first action",)),
    task.Task(action, args=("last action",))
)
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task created, state:", t.state)
t.start()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task started, state:", t.state)
time.sleep(1)
t.stop()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task stopped, state:", t.state)
t.cont(gap=2)
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task continued, state:", t.state)
time.sleep(1)
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "waited 1 sec., state:", t.state)
time.sleep(1)
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "waited 1 sec., state:", t.state)
t.join()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task joined, state:", t.state)
      
The output:

18:54:17.673259 task created, state: INIT
18:54:17.674380 first action begin
18:54:17.674556 task started, state: STARTED
18:54:18.675986 task stopped, state: TO_STOP
18:54:18.677085 task continued, state: TO_STOP
18:54:19.676891 first action end
18:54:19.678387 waited 1 sec., state: TO_CONTINUE
18:54:20.677659 last action begin
18:54:20.679900 waited 1 sec., state: STARTED
18:54:22.680142 last action end
18:54:22.680710 task joined, state: FINISHED
      
Looks good, both actions are executed once, the second action begins two sec. after the call of cont, the states are INITSTARTEDTO_STOPTO_CONTINUESTARTEDFINISHED.

Stopping and continuing a song

Again we use class Jukebox to test contained tasks:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
jukebox.verbosity = 1
t_song = jukebox.song(ev3_sound.TRIAS)
t_song.start()
time.sleep(1)
t_song.stop()
t_song.cont(2)
      
The output:

20:30:17.552945 Sent 0x|08:00|2A:00|80|00:00|82:1B:03|
20:30:17.557831 Sent 0x|0C:00|2B:00|80|00:00|94:01:01:82:06:01:00|
20:30:18.309089 Sent 0x|0C:00|2C:00|80|00:00|94:01:01:82:4A:01:00|
20:30:18.558362 Sent 0x|08:00|2D:00|80|00:00|82:1B:01|
20:30:18.559895 Sent 0x|07:00|2E:00|80|00:00|94:00|
20:30:20.560389 Sent 0x|0C:00|2F:00|80|00:00|94:01:01:82:88:01:00|
20:30:21.305400 Sent 0x|08:00|30:00|80|00:00|82:1B:05|
20:30:21.310159 Sent 0x|0C:00|31:00|80|00:00|94:01:01:82:0B:02:00|
20:30:23.555294 Sent 0x|08:00|32:00|80|00:00|82:1B:03|
20:30:23.559971 Sent 0x|07:00|33:00|80|00:00|94:00|
20:30:23.561446 Sent 0x|08:00|34:00|80|00:00|82:1B:01|
      
Synchronisation of contained tasks works correct, tones and light are synchrone in the first and the second part of the song.

Stopping and continuing a repeated sound file

Situations, where the algorithm doesn't work as we intent, help to understand the mechanism. One learns more by errors than by anything else. What's our problem? There is no direct command to continue the playing of a sound file! We can stop and restart but not continue. To continue a repeated sound file with a fixed duration, we start it again but with a modified timing (shorter duration).

We test the stopping and continuation:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', duration=5, repeat=True)
t.start()
time.sleep(2)
t.stop()
time.sleep(2)
t.cont()
      
The output:

20:46:58.965333 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
20:47:00.967754 Sent 0x|07:00|2B:00|80|00:00|94:00|
20:47:02.970920 Sent 0x|1D:00|2C:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
20:47:05.968598 Sent 0x|07:00|2D:00|80|00:00|94:00|
      
Looks great!

The problem of the next action

We modify the program slightly and call method cont with argument gap:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', duration=5, repeat=True)
t.start()
time.sleep(2)
t.stop()
t.cont(2)
 
The output:

20:51:15.311826 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
20:51:17.314737 Sent 0x|07:00|2B:00|80|00:00|94:00|
20:51:19.315760 Sent 0x|1D:00|2C:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
20:51:19.317129 Sent 0x|07:00|2D:00|80|00:00|94:00|
 
The stopping occurs immediately after the continuation, why? Calling method cont with argument gap says: wait, then continue with the next action. The next action is stop! The callers intention is, that the repeated playing is the action and she does not know, that the playing of sound files can't be continued. We tricksed when we restarted and named it continuation. This trick fails under some special conditions.

The solution: subclassing Task

The solution of the problem lies in subclassing. We modify the continuation and prevent the immediate execution of the next action. The new version of method sound:


    def sound(self, path: str, duration: float=None, repeat: bool=False) -> task.Task:
        ...
        elif repeat:
            class _Task(task.Task):
                def _final(self, **kwargs):
                    super()._final(**kwargs)
                    if self._root._time_action:
                        self._root._time_rest = self._root._time_action - time.time()
                        self._root._time_action -= self._root._time_rest
                def _cont2(self, **kwargs):
                    self._time_action += self._time_rest
                    super()._cont2(**kwargs)

            t_inner = task.concat(
                _Task(
                    self.send_direct_cmd,
                    args=(ops,),
                    duration=duration,
                    action_stop=self.stop,
                    action_cont=self.send_direct_cmd,
                    args_cont=(ops,)
                ),
                _Task(self.stop)
            )
            return task.Task(t_inner.start, join=True)
        ...
      
We shift _time_action backwards from the stopping to the sound continuation. This is what the outer world expects. When continuing, we shift in reverse direction and reconstruct the old situation. If in the meantime somebody asks for property time_action, he gets the shifted value. This will change the behaviour of method cont when argument gap is set. We again start the program. Its output:

21:01:58.678197 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
21:02:00.680940 Sent 0x|07:00|2B:00|80|00:00|94:00|
21:02:02.681971 Sent 0x|1D:00|2C:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
21:02:05.682649 Sent 0x|07:00|2D:00|80|00:00|94:00|
      
Well done!

Conclusion

Stopping and continuing tasks is a complex mechanism. This lesson was hard stuff! We had to manage all the timing and synchronization, a lot of technical details. When we look from the outside on the tasks, we don't see all this details and realize the benefits:

  • It provides modularity. We can combine small tasks to medium tasks and medium tasks to complex tasks.
  • It helps to manage high complexity because the compexity of the outside API doesn't grow.
  • It helps for good design. Starting, stopping and continuation seems very natural.
  • It hides the locking and the multithreading mechanism and the synchronization of contained tasks.

We also see the drawbacks:

  • Stopping and continuing need a special design of the tasks. A simple wrapping of a callable into a task object often does not fit our needs. This complicates the coding of the robots actions.
  • Simple things become at least medium complex. Tasks are an abstraction layer, that doesn't keep simple things simple.

I would be glad to get your feedback! Our next lesson will be about class TwoWheelVehicle. We will train it to stop immediately and continue appropriate.

Saturday 14 May 2016

Lesson 8 - Organizing Tasks

EV3 Direct commands - Lesson 08

Introduction

After the sightseeing tour of our last lesson we come now to the real thing, tools that implement multithreading and help to organize multiple tasks. If you didn't read lesson 7, you need profound knowledge of multithreading. If you are not this expert, please visit lesson 7 and then come back.

The basic idea we build our tools upon are linked lists. We name them tasks. A task is an object, that can be started and then executes a sequence of callables. The final result will be somewhat complex (only inside, its outside API will be very simple). Therefore we start with a basic version and add the functionality step by step.

Combining linked lists with multithreading

Let's look at a first version:


#!/usr/bin/env python3

import threading, typing

STATE_INIT = 'INIT'
STATE_STARTED = 'STARTED'
STATE_FINISHED = 'FINISHED'

def concat(*tasks) -> 'Task':
    chain = None
    for task in tasks:
        assert isinstance(task, Task), 'tasks must be instances of class Task'
        if not chain:
            chain = task
        else:
            chain.append(task)
    return chain

class Task:
    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
    ):
        assert isinstance(action, typing.Callable), \
            "action needs to be a callable"
        assert isinstance(args, tuple), 'args needs to be a tuple'
        assert isinstance(kwargs, dict), 'kwargs needs to be a dictionary'
        self._action = action
        self._args = args
        self._kwargs = kwargs
        self._next = None
        self._root = self
        # the following are root only attributes
        self._state = STATE_INIT
        self._thread = None
        self._lock = threading.Lock()
        self._last = self

    def append(self, task: 'Task') -> 'Task':
        self._lock.acquire()
        assert isinstance(task, Task), "task needs to be a Task instance"
        assert self._root is self, "only root tasks can be appended"
        assert task._root is task, "both tasks need to be root tasks"
        assert self._state in [STATE_INIT, STATE_FINISHED], "can't append to tasks in state " + self._state
        assert task._state in [STATE_INIT, STATE_FINISHED], "can't append tasks in state " + task._state
        self._last._next = task
        self._last = task._last
        task.root = self
        self._lock.release()
        return self

    def start(self) -> 'Task':
        self._lock.acquire()
        assert self._root is self, "only root tasks can be started"
        assert self._state in [STATE_INIT, STATE_FINISHED], "can't start from state " + self._state
        self._state = STATE_STARTED
        self._thread = threading.Thread(target=self._execute)
        self._thread.start()
        return self

    def join(self) -> None:
        self._thread.join()
        
    def _execute(self) -> None:
        self._root._lock.release()
        self._action(*self._args, **self._kwargs)
        self._root._lock.acquire()
        if self._next:
            self._next._execute()
        else:
            self._root._state = STATE_FINISHED
            self._root._lock.release()
            return

    @property
    def state(self) -> str:
        self._lock.acquire()
        value = self.state_no_lock
        self._lock.release()
        return value
    
    @property
    def state_no_lock(self) -> str:
        assert self._root is self, "only root tasks can be asked about their state"
        return self._state
    
    @property
    def lock(self) -> threading.Lock:
        assert self._root is self, "only root tasks can be asked about their lock"
        return self._lock

    @property
    def root(self):
        return self._root
    @root.setter
    def root(self, task):
        self._root = task
        if self._next:
            self._next.root = task
      
Remarks:
  • A task object is a linked list (chain of tasks), that starts with a root task, which is followed by an unlimited number of links. All of them are Task objects.
  • All Task objects are created as root tasks. Method append does what its name says, it appends a root task (Task object) to another root task. The result is a chain of tasks, where the appended task is no more a root task.
  • Function concat is syntactic sugar, which allows to build a chain of tasks from a tuple of Task objects.
  • All comunication from and to the outside world is done via the root task, which represents the whole chain of tasks. The following tasks in the linked list become inaccessible for the outside world.
  • Attributes of all tasks:
    • _action: callable object (f.i. a function or a method).
    • _args: argument list of _action.
    • _kwargs: keyword arguments of _action.
    • _next: pointer to the next Task object in the linked list.
    • _root: pointer to the root task (also a Task object).
  • Additional attributes of root tasks:
    • _state: actual state of the task (or chain of tasks).
    • _thread: The thread, that does the execution (Thread instance from module threading).
    • _lock: The Lock object (from module threading).
    • _last: pointer to the last task in the chain (method append needs it).
  • A root task can be identified by self._root is self.
  • Method start creates a new thread, that does the execution. start itself returns immediately. If you need the started thread for your timing, you can call method join (f.i. task.start().join()).
  • Method _execute is the heart of the execution. Every task executes its action and then calls method _execute of the next link in the chain.
  • Nearly all methods and properties start with aquiring the Lock object. This guaranties an exclusive access and finds an object in a consitent state. At the end of the method (or property logic), the lock is released.
  • The lock is released, when _action starts execution and again acquired, after _action is finished. This says another thread gets access to a Task object, when:
    • its state is STATE_INIT (before started),
    • its state is STATE_FINISHED,
    • one of its actions is actually executed.
  • All the properties, that read changeable data need a locking mechanism. If the outside world has to read more than one property, it can explicitly lock and then call the no-lock-version of the properties:
    
    ...
    my_task.lock.acquire()
    if my_task.state_no_lock is task.STATE_FINISHED:
        print("finished")
    elif my_task.state_no_lock is task.STATE_STARTED:
        print("started")
    else:
        print("initial")
    my_task.lock.release()
    ...   
       
    If no explicit locking would be done and the state changed just between the two accesses (STARTED -> FINISHED), it printed an incorrect state (INIT). This case is for demonstration only, a better solution would be:
    
    ...
    state = my_task.state
    if state is task.STATE_FINISHED:
        print("finished")
    elif state is task.STATE_STARTED:
        print("started")
    else:
        print("initial")
    ...   
       
  • the setter of property root is recursive. If set in a former root task, it sets the new root for the whole chain of tasks.

Task objects versus Thread objects

Actually Task objects are very similar to Thread objects. Both have methods start and join. The main difference is that Task objects allow to build chains and handle the chains like single Task objects.

Parallel executed tasks

As a first test, we run two tasks parallel:


#!/usr/bin/env python3

import ev3, ev3_sound, ev3_vehicle, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)

speed = 10
t1 = task.Task(
    vehicle.drive_turn,
    args=(speed, 0, 720)
)
t2 = task.Task(
    jukebox.play_song,
    args=(ev3_sound.HAPPY_BIRTHDAY,)
)
t1.start()
t2.start()
t1.join()
t2.join()
vehicle.stop()
      
The vehicle turns two circles on place and plays Happy Birthday. The longer of both tasks does the timing.

If we use Thread objects instead of Task objects, this reads as:


#!/usr/bin/env python3

import ev3, ev3_sound, ev3_vehicle, threading

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)

speed = 10
t1 = threading.Thread(
    target=vehicle.drive_turn,
    args=(speed, 0, 720)
)
t2 = threading.Thread(
    target=jukebox.play_song,
    args=(ev3_sound.HAPPY_BIRTHDAY,)
)
t1.start()
t2.start()
t1.join()
t2.join()
vehicle.stop()
    
There is no real difference, the handling of both alternatives is very similar.

Chains of tasks

We run a chain of tasks:


#!/usr/bin/env python3

import ev3, ev3_sound, ev3_vehicle, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)

speed = 10
task.concat(
    task.Task(
        vehicle.drive_turn,
        args=(speed, 0, 360)
    ),
    task.Task(
        vehicle.drive_turn,
        args=(speed, 0, 360),
        kwargs={"right_turn": True}
    ),
    task.Task(
        vehicle.stop
    )
).start()
jukebox.play_song(ev3_sound.HAPPY_BIRTHDAY)
      
The movement of the vehicle is a chain of three links, driving and music run parallel.

We use a Thread object instead of Task objects:


#!/usr/bin/env python3

import ev3, ev3_sound, ev3_vehicle, threading

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=jukebox)

speed = 10

def do_it():
    vehicle.drive_turn(speed, 0, 360)
    vehicle.drive_turn(speed, 0, 360, right_turn=True)
    vehicle.stop()
    
threading.Thread(
    target=do_it
).start()
jukebox.play_song(ev3_sound.HAPPY_BIRTHDAY)
      
O.k. it works, but in its actual state it is no real progress to use Task objects instead of Thread objects. It needs additional functionality to make it a valuable tool.

Repeated actions

Let's look at some scenarios and prove if our class Task is able to execute them. Here they are:

  • Every 0.2 sec. your EV3 device reads the actual free distance from its infrared sensor. This is repeated 100 times. The times and data are written to a file.
  • Your application drives a vehicle: Every 0.5 sec. it tests if there is no barrier in front of the vehicle, if the distance is smaller than 30 cm, it stops the motors.
  • Your application watches a door: Every 5 sec. it tests, if the touch sensor is touched, if not, it plays some sound signal.
  • Your application is a vehicle, that follows a light. Every 20 sec. its light sensor scans all directions for the brightest source of light. Then the orientation of the vehicle is changed to this direction.
All these scenarios don't fit our tool. We need some modifications to deal with repeated actions. Hopefully you remember class TwoWheelVehicle of lesson 6. We had a very similar situation, where functions _test_pos and _test_o returned numbers:
  • value == -1: signals the caller to finish the loop.
  • value > 0: value is the time to wait until the next call (in sec.).
  • value == 0: call again without any waiting.

We need some new constants and we import modules time and numbers:


#!/usr/bin/env python3

import threading, typing, time, numbers

...

ACTIVITY_NONE = 'NONE'
ACTIVITY_BUSY = 'BUSY'
ACTIVITY_SLEEP = 'SLEEP'
  

We change the constructor of class Task:


    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
    ):
        ...
        self._num = 0
        self._cnt = 0
        # the following are root only attributes
        ...
        self._cond = threading.Condition(self._lock)
        self._activity = ACTIVITY_NONE
        self._time_action = None
      
The meaning of the new attributes:
  • _num: maximum calls number of _action (value 0 stands for unlimited).
  • _cnt: counter of _action calls.
  • _cond: used for interruptable sleeping.
  • _activity: actual activity type (ACTIVITY_NONE, ACTIVITY_BUSY or ACTIVITY_SLEEP).
  • _time_action: time of the actual call of _action (when _activity is ACTIVITY_BUSY) or the next call of _action (when _activity is ACTIVITY_SLEEP).

We add two lines of code to method start:


    def start(self) -> None:
        ...
        self._cnt = 0
        self._time_action = time.time()
        self._thread = threading.Thread(target=self._execute)
        self._thread.start()
      

The new version of method _execute:


    def _execute(self) -> None:
        while True:
            gap = self._wrapper()
            self._cnt += 1
            if gap == -1 or self._num > 0 and self._cnt >= self._num:
                self._root._time_action = time.time()
                break
            if gap == 0:
                self._root._time_action = time.time()
                continue
            self._root._time_action += gap
            real_gap = self._root._time_action - time.time()
            if real_gap > 0:
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(real_gap)
                self._root._activity = ACTIVITY_NONE
        self._root._time_action = time.time()
        gap = 0
        if gap > 0:
            self._root._activity = ACTIVITY_SLEEP
            self._root._cond.wait(gap)
            self._root._activity = ACTIVITY_NONE
        if self._next:
            self._next._cnt = 0
            self._next._execute()
        else:
            self._final()
      
with new methods, that wrapp _action:

    def _wrapper(self) -> int:
        self._wrapper1()
        self._action(*self._args, **self._kwargs)
        self._wrapper2()
        return -1
    
    def _wrapper1(self):
        self._root._activity = ACTIVITY_BUSY
        self._root._lock.release()

    def _wrapper2(self):
        self._root._lock.acquire()
        self._root._activity = ACTIVITY_NONE
      
returning value -1 guaranties that the loop is executed once and no sleeping will happen. This sounds funny, we code some new logic which is not used. The explanation is: Task objects will never use it, but their subclasses! It allows to subclass Task with unchanged method _execute but new method _wrapper.

We add a new method, that prepares a consistent combination of attributes in case of returning:


    def _final(self) -> None:
        self._root._state = STATE_FINISHED:
        self._root._time_action = None
        self._root._lock.release()
      

We add some properties:


    @property
    def time_action(self) -> float:
        self._root._lock.acquire()
        assert self._root is self, 'only root tasks can be asked about their time_action'
        value = self.time_action_no_lock
        self._root._lock.release()
        return value

    @property
    def time_action_no_lock(self) -> float:
        return self._time_action

    @property
    def activity(self) -> str:
        self._root._lock.acquire()
        assert self._root is self, 'only root tasks can be asked about their activity'
        value = self.activity_no_lock
        self._root._lock.release()
        return value

    @property
    def activity_no_lock(self) -> str:
        return self._activity
      

Class Repeated

We subclass Task:


class Repeated(Task):
    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0
    ):
        super().__init__(action, args, kwargs)
        assert isinstance(num, int), 'num must be an integer'
        assert num >= 0, 'num must be positive'
        self._num = num

    def _wrapper(self):
        self._wrapper1()
        value = self._action(*self._args, **self._kwargs)
        assert isinstance(value, Task) or \
            isinstance(value, numbers.Number) or \
            isinstance(value, bool) or \
            value is None, \
            'action needs to return a task, a number, a boolean or None'
        assert not isinstance(value, numbers.Number) or \
            value == -1 or \
            value >= 0, \
            'if action return a number, it must be positive or -1'
        if value is True:
            rc = -1
        elif  isinstance(value, Task) or value is False or value is None:
            rc = 0
        else:
            rc = value
        self._wrapper2()
        return rc
      
It's the callable _action that controls the execution by its return value. There are three types of return values (None, bool, int):
  • None: There is no waiting and the number of calls of _action must be limited by num.
  • bool: There is no waiting, the calling ends, when value True is returned.
  • int:
    • -1: no further call (of _action).
    • 0: no waiting, call again as soon as possible.
    • > 0: wait, then call again.
num is an alternative to control the execution. If set, it is an upper limit for the number of calls of _action. It must be set when the Repeated object is constructed.

Tests

We play the trias three times:


#!/usr/bin/env python3

import ev3, ev3_sound, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
task.Repeated(
    jukebox.play_song,
    args=(ev3_sound.TRIAS,),
    num=3
).start()
      

Task factories

I feel confident in the task concept and try to organize as much actions as possible in tasks. The best way to this target is implementing tasks into the subclasses of class EV3.

Method song of class Jukebox

We start with class Jukebox and add a method song:


    def song(self, song: dict) -> task.Task:
        return task.concat(
            task.Task(
                self._init_tone
            ),
            task.Repeated(
                self._next_tone,
                args=(song,)
            ),
            task.Task(
                self.stop
            )                
        )
      
This method returns a Task object that plays the song, when started. It is a chain of tasks with three links, but this is an internal information. The ouside world knows: it's a task, which can be concatenated or started. We test it with this program:

#!/usr/bin/env python3

import ev3, ev3_sound, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.song(ev3_sound.HAPPY_BIRTHDAY).start()
      
We miss the lightshow, which will be added soon. Everything seems to work. If you look at the above described scenarios, we already have the tools to realize them. Periodic, our next subclass of Task will be syntactic sugar. It prevents from writing wrappers around callables.

Periodic actions

All of our scenarios knew the time distance between two calls of _action from the beginning. This allows to give them as sttributes into the constructor of the objects. It does not help for transparency, when the time distance is returned by method _action. Our new subclass Periodic will better fit this situation:


class Periodic(Task):
    def __init__(
            self,
            intervall: float,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0
    ):
        super().__init__(action, args, kwargs)
        assert isinstance(intervall, numbers.Number), 'intervall must be a number'
        assert intervall >= 0, 'intervall must be positive'
        assert isinstance(num, int), 'num must be an integer'
        assert num >= 0, 'num must be positive'
        self._intervall = intervall
        self._num = num

    def _wrapper(self):
        self._wrapper1()
        value = self._action(*self._args, **self._kwargs)
        assert isinstance(value, Task) or isinstance(value, bool) or value is None, \
            'action needs to return a task, a boolean or None'
        if value is True:
            rc = -1
        else:
            rc = self._intervall
        self._wrapper2()
        return rc
      
The waiting time is set, when the constructor is called (parameter intervall). I think, a good and descriptive variant. Look at method _wrapper, the callable _action may return two types of values (None, bool):
  • None: You have to limit the number of calls of _action by setting num.
  • bool: The calling ends, when value True is returned.

Test

We play the trias three times, once per minute:


#!/usr/bin/env python3

import ev3, ev3_sound, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
task.Periodic(
    60,
    jukebox.play_song,
    args=(ev3_sound.TRIAS,),
    num=3
).start()
      

Exact timing

For the moment, exact timing works for repetitions but not for chains of tasks. We want:

  • the possibility to switch to netto-time (that says the waiting for the next call of _action does not depend from the duration of its last execution),
  • a chance to set a fixed duration for a task.
This says we have to add some functionality. The new constructor:

    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            duration: float=None
    ):
        ...
        assert duration is None or isinstance(duration, numbers.Number), \
            'duration needs to be a number'
        assert duration is None or duration >= 0, \
            'duration needs to be positive'
        ...
        self._duration = duration
        self._time_end = None
        self._netto_time = False
        # the following are root only attributes
        ...
      
The meanings of the new attributes:
  • _duration: duration for this task. If the task finishes later, the following tasks have to compensate.
  • _time_end: holds the time, when this task has to be finished (is set when executed).
  • _netto_time: flag, that waiting is netto (execution of action counts extra)

We change method start:


    def start(self) -> None:
        self._lock.acquire()
        assert self._root is self, "only root tasks can be started"
        assert self._state in [STATE_INIT, STATE_FINISHED], \
            "can't start from state " + self._state
        self._state = STATE_STARTED
        self._cnt = 0
        self._time_action = time.time()
        if self._duration != None:
            self._time_end = self._time_action + self._duration
        self._thread = threading.Thread(target=self._execute)
        self._thread.start()
      

The updated method _execute:


    def _execute(self) -> None:
        while True:
            gap = self._wrapper()
            self._cnt += 1
            if gap == -1 or self._num > 0 and self._cnt >= self._num:
                self._root._time_action = time.time()
                break
            if gap == 0:
                self._root._time_action = time.time()
                continue
            if self._netto_time:
                self._root._time_action = time.time() + gap
                real_gap = gap
            else:
                self._root._time_action += gap
                real_gap = self._root._time_action - time.time()
            if real_gap > 0:
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(real_gap)
                self._root._activity = ACTIVITY_NONE
        if self._time_end:
            self._root._time_action = self._time_end
            self._time_end = None
            gap = self._root._time_action - time.time()
            if gap > 0:
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(gap)
                self._root._activity = ACTIVITY_NONE
        else:
            self._root._time_action = time.time()
        if self._next:
            self._next._cnt = 0
            if self._next._duration != None:
                self._next._time_end = self._root._time_action + self._next._duration
            self._next._execute()
        else:
            self._final()
      
There is an additional call of method sleep at the end of the task. This call guaranties, that the task lasts at least the duration, which was set by the constructor.

_netto_time is relevant for the classes Repeated and Periodic only. We need to change their constructors. The new constructor of class Repeated:


    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0,
            duration: float=0,
            netto_time: bool=False
    ):
        super().__init__(action, args, kwargs, duration=duration)
        assert isinstance(num, int), 'num must be an integer'
        assert num >= 0, 'num must be positive'
        assert isinstance(netto_time, bool), 'netto_time must be a bool value'
        self._num = num
        self._netto_time = netto_time
      

The new constructor of class Periodic:


    def __init__(
            self,
            intervall: numbers.Number,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0,
            duration: float=0,
            netto_time: bool=False
    ):
        super().__init__(action, args, kwargs, duration=duration)
        assert isinstance(intervall, numbers.Number), 'intervall must be a number'
        assert intervall >= 0, 'intervall must be positive'
        assert isinstance(num, int), 'num must be an integer'
        assert num >= 0, 'num must be positive'
        assert isinstance(netto_time, bool), 'netto_time must be a bool value'
        self._intervall = intervall
        self._num = num
        self._netto_time = netto_time
      

Subclass Sleep

This allows to create another subclass:


class Sleep(Task):
    def __init__(self, seconds: float):
        super().__init__(self._do_nothing, duration=seconds)

    def _do_nothing(self): return -1
      
Class Sleep will allow to be stopped, but this is the future. For the moment it is just an alternative for:

Task(time.sleep, args=(seconds,))
      
which is able to compensate the time deviations of previous tasks.

Tests

We test the compensation:


#!/usr/bin/env python3

import ev3, ev3_sound, task, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

task.concat(
    task.Task(
        time.sleep,
        args=(1,),
        duration=0.5
    ),
    task.Periodic(
        1,
        jukebox.play_tone,
        args=("a'", 0.1),
        num=4
    )
).start()
      
Sleeping for one second with a duration of 0.5 sec.! The chain of tasks will be half a sec. late, when it has finished its first task (sleep). The second task (play_tone) has to compensate this delay. Its output:

10:27:54.356145 Sent 0x|0D:00|2A:00|80|00:00|94:01:01:82:B8:01:81:64|
10:27:54.855296 Sent 0x|0D:00|2B:00|80|00:00|94:01:01:82:B8:01:81:64|
10:27:55.854861 Sent 0x|0D:00|2C:00|80|00:00|94:01:01:82:B8:01:81:64|
10:27:56.854826 Sent 0x|0D:00|2D:00|80|00:00|94:01:01:82:B8:01:81:64|
      
Between the first call of play_tone and the second we see a time distance of half a sec. This compensates the delay. From then on the time distance is one sec. We recognize, that the usage of duration allows exact timing and also can prevent exact timing.

We test class Sleep with a program, that uses tasks for a little lightshow:


#!/usr/bin/env python3

import task, ev3, ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t_green = task.Periodic(2, jukebox.change_color, args=(ev3.LED_GREEN,), num=4)
t_red = task.Periodic(4, jukebox.change_color, args=(ev3.LED_RED,), num=2)
t_orange = task.Periodic(4, jukebox.change_color, args=(ev3.LED_ORANGE,), num=2)

t = task.concat(
    task.Task(t_red.start),
    task.Sleep(1),
    task.Task(t_green.start),
    task.Sleep(1),
    task.Task(t_orange.start),
    task.Task(t_green.join)
)
t.start().join()
print("done")
      
Here we have four tasks, t_green, t_red, t_orange and t. t is the master, that schedules the starting of the three other tasks. When t_orange is started, t joins the task of t_green. We want its thread to be joinable (for timing purpose, the join has to end when the lightshow ends). Its output:

11:26:32.103894 Sent 0x|08:00|2A:00|80|00:00|82:1B:02|
11:26:33.105693 Sent 0x|08:00|2B:00|80|00:00|82:1B:01|
11:26:34.107247 Sent 0x|08:00|2C:00|80|00:00|82:1B:03|
11:26:35.106055 Sent 0x|08:00|2D:00|80|00:00|82:1B:01|
11:26:36.104310 Sent 0x|08:00|2E:00|80|00:00|82:1B:02|
11:26:37.106052 Sent 0x|08:00|2F:00|80|00:00|82:1B:01|
11:26:38.107876 Sent 0x|08:00|30:00|80|00:00|82:1B:03|
11:26:39.106062 Sent 0x|08:00|31:00|80|00:00|82:1B:01|
done
      
It works and we learned, that tasks can start other tasks. We plan to make our tasks stoppable, which says that tasks inside of tasks (contained tasks) need to be stopped too. The conclusion is, we have to change class Task once more.

Contained tasks

We add a class attribute _contained_register to class Task:


    class Task:
        _contained_register = {}
      
Remarks:
  • This will hold all contained tasks with their parent tasks. It's a class attribute that exists once and can be accessed from all instances of class Task. It allows to ask if task1 is cild of (contained in) task2:
    
        if task1 in task.Task._contained_register and task.Task._contained_register[task1] is task2:
            print(task1, "is child of", task2)
       
  • The access is from the child to the parent. We delete an entry, when a task is finished:
    
        self._root._state = STATE_FINISHED
        if self._root in self._contained_register:
            self._contained_register.pop(self._root)
       

We add some code to the constructor:


class Task:
    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            join: bool=False,
            duration: float=None
    ):
        ...
        assert isinstance (join, bool), 'join needs to be a bool value'
        assert not join or hasattr(action, '__self__'), 'only bounded methods can be joined'
        assert not join or isinstance(action.__self__, Task), 'only instances of Task can be joined'
        assert not join or action.__name__ == "start", 'only method start can be joined'
        ...
        self._join = join
        ...
        # the following are root only attributes
        ...
        self._contained = []
      
The meanings of the new attributes:
  • _join: A flag, that signals if the contained task will be joined when starting it. This says the starting task will wait until the started task is finished.
  • _contained: Holds the tasks, which where started as contained (cild) tasks and may still run. In case of stopping the task, these have to be stopped too. This is the opposite direction than _contained_register: from the parent to the child. Its a list because a task may have multiple contained tasks.

Methods _wrapper1 and _wrapper2 also need to be changed:


    def _wrapper1(self) -> None:
        if hasattr(self._action, '__self__') and \
           isinstance(self._action.__self__, Task) and \
           self._action.__name__ in ["start", "join"]:
            task = self._action.__self__
            name = self._action.__name__
            if name == "start":
                if not task in self._root._contained:
                    self._root._contained.append(task)
                self._contained_register.update({task: self._root})
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ == "start" or \
           self._action.__name__ == "start" and self._join:
            self._root._activity = ACTIVITY_BUSY
            self._root._lock.release()

    def _wrapper2(self) -> None:
        if self._join:
            self._action.__self__._thread.join()
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ == "start" or \
           self._action.__name__ == "start" and self._join:
            self._root._lock.acquire()
            self._root._activity = ACTIVITY_NONE
        if hasattr(self._action, '__self__') and \
           isinstance(self._action.__self__, Task) and \
           self._action.__name__ in ["start", "join"]:
            task = self._action.__self__
            state = task.state
            if state == STATE_FINISHED and \
               task in self._root._contained:
                self._root._contained.remove(task)
      
This adds all started tasks to the root tasks list _contained and guaranties that the root task knows all contained tasks which run parallel. A few remarks:
  • Attribute __self__ of a method (bounded callable) holds the object, the method belongs to.
  • Attribute __name__ of a callable holds its name.
  • These two attributes help to identify contained tasks (_action calls method start of a Task instance):
    
            if hasattr(self._action, '__self__') and \
               isinstance(self._action.__self__, Task) and \
               self._action.__name__ == "start":
       
  • Starting a task is not time consuming, therefore we do not release the lock. Only if we join the started task.

Method _final needs to be changed:


    def _final(self) -> None:
        self._root._contained = self._join_contained()
        self._root._state = STATE_FINISHED
        if self._root in self._contained_register:
            self._contained_register.pop(self._root)
        self._root._time_action = None
        self._root._lock.release()
      
with a new method:

    def _join_contained(self) -> list:
        contained = self._root._contained
        self._root._activity = ACTIVITY_JOIN
        self._root._lock.release()
        not_finished = []
        for task in contained:
            if not task in self._contained_register or \
               not self._contained_register[task] is self._root:
                continue
            task.join()
            if task.state != STATE_FINISHED:
                not_finished.append(task)
        self._root._lock.acquire()
        self._root._activity = ACTIVITY_NONE
        return not_finished
      

The call of method _join_contained guaranties, that a task never is finished, before all its contained tasks are finished. We add a new activity:


ACTIVITY_NONE = 'NONE'
ACTIVITY_BUSY = 'BUSY'
ACTIVITY_SLEEP = 'SLEEP'
ACTIVITY_JOIN = 'JOIN'
      

We change property time_action_no_lock:


    @property
    def time_action_no_lock(self) -> float:
        min = None
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ in ["start", "join"]:
            min = self._time_action
        for task in self._contained:
            act = task.time_action
            if min is None:
                min = act
            elif act != None and act < min:
                min = act
        return min
      
The earliest time of any action is returned, may it be of the tasks own action or one of its direct or indirect contained tasks (recursion). Starting or joining a task is no action, we ignore them. What counts is the next action of the contained task, which is considered by recursion.

I think, it is not a good idea, when instances of Repeated or Periodic start unjoined tasks. Therefore we don't add a parameter join to their constructors. This says, they can contain tasks but these are always joined tasks. The constructor of Repeated:


    def __init__(
            self,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0,
            duration: float=0,
            netto_time: bool=False
    ):
        if hasattr(action, '__self__') and \
           isinstance(action.__self__, Task) and \
           action.__name__ == "start":
            super().__init__(action, args, kwargs, join=True, duration=duration)
        else:
            super().__init__(action, args, kwargs, duration=duration)
        ...
      

The constructor of Periodic:


    def __init__(
            self,
            intervall: float,
            action: typing.Callable,
            args: tuple=(),
            kwargs: dict={},
            num: int=0,
            duration: float=0,
            netto_time: bool=False
    ):
        if hasattr(action, '__self__') and \
           isinstance(action.__self__, Task) and \
           action.__name__ == "start":
            super().__init__(action, args, kwargs, join=True, duration=duration)
        else:
            super().__init__(action, args, kwargs, duration=duration)
        ...
      

Testing contained classes

We change our lightshow-program:


#!/usr/bin/env python3

import task, ev3, ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t_green = task.Periodic(2, jukebox.change_color, args=(ev3.LED_GREEN,), num=4)
t_red = task.Periodic(4, jukebox.change_color, args=(ev3.LED_RED,), num=2)
t_orange = task.Periodic(4, jukebox.change_color, args=(ev3.LED_ORANGE,), num=2)

task.concat(
    task.Task(t_red.start),
    task.Sleep(1),
    task.Task(t_green.start),
    task.Sleep(1),
    task.Task(t_orange.start)
).start().join()
print("done")
      
There is no need for joining t_green any more, this is done inside. This programs output:

12:39:08.368138 Sent 0x|08:00|2A:00|80|00:00|82:1B:02|
12:39:09.370120 Sent 0x|08:00|2B:00|80|00:00|82:1B:01|
12:39:10.373537 Sent 0x|08:00|2C:00|80|00:00|82:1B:03|
12:39:11.371645 Sent 0x|08:00|2D:00|80|00:00|82:1B:01|
12:39:12.368623 Sent 0x|08:00|2E:00|80|00:00|82:1B:02|
12:39:13.370486 Sent 0x|08:00|2F:00|80|00:00|82:1B:01|
12:39:14.374323 Sent 0x|08:00|30:00|80|00:00|82:1B:03|
12:39:15.370430 Sent 0x|08:00|31:00|80|00:00|82:1B:01|
done
      

Modifying the task factory

We add another method to class Jukebox, which also is a task factory.

Method sound

Method sound plays a sound file. It's API:


     |  sound(self, path:str, duration:float=None, repeat:bool=False) -> task.Task
     |      returns a Task object, that plays a sound file
     |      
     |      Attributes:
     |      path: name of the sound file (without extension ".rsf")
     |      
     |      Keyword Attributes:
     |      duration: duration of the sound file (in sec.)
     |      repeat: flag, if repeatedly playing
      

The code:


    def sound(self, path: str, duration: float=None, repeat: bool=False) -> task.Task:
        if repeat:
            ops = b''.join([
                ev3.opSound,
                ev3.REPEAT,
                ev3.LCX(self._volume), # VOLUME
                ev3.LCS(path)          # NAME
            ])
        else:
            ops = b''.join([
                ev3.opSound,
                ev3.PLAY,
                ev3.LCX(self._volume), # VOLUME
                ev3.LCS(path)          # NAME
            ])
        if not duration:
            return task.Task(
                self.send_direct_cmd,
                args=(ops,)
            )
        elif not repeat:
            return task.Task(
                self.send_direct_cmd,
                args=(ops,),
                duration=duration
            )
        else:
            return task.concat(
                task.Task(
                    self.send_direct_cmd,
                    args=(ops,),
                    duration=duration
                ),
                task.Task(self.stop)
            )
      

We test it with this program:


#!/usr/bin/env python3

import ev3, ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', duration=5, repeat=True).start()
      
It's output:

12:43:28.528101 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
12:43:33.528569 Sent 0x|07:00|2B:00|80|00:00|94:00|
      

A second test where we wrap a Repeated around the Task:


#!/usr/bin/env python3

import ev3, ev3_sound, task

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = task.Repeated(
    jukebox.sound('./ui/DownloadSucces', duration=2).start,
    num=3
).start()
      
The output:

12:45:46.594595 Sent 0x|1D:00|2A:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
12:45:48.612966 Sent 0x|1D:00|2B:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
12:45:50.614405 Sent 0x|1D:00|2C:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
      

Method song

Now we add the lightshow to method song. First we add an attribute _pos_led to the constructor of class Jukebox:


class Jukebox(ev3.EV3):
    def __init__(self, protocol: str=None, host: str=None, ev3_obj=None):
        super().__init__(protocol=protocol, host=host, ev3_obj=ev3_obj)
        self._volume = 1
        self._temperament = 440
        self._pos_tone = None
        self._pos_led = None
        self._plays = False
      
Then we add the methods _init_color and _next_color to class Jukebox:

    def _init_color(self) -> None:
        self._pos_led = 0

    def _next_color(self, song) -> bool:
        if not self._plays:
            return True
        self.change_color(song["led_sequence"][self._pos_led])
        self._pos_led += 1
        self._pos_led %= len(song["led_sequence"])
      
We modify method stop:

    def stop(self) -> None:
        self.send_direct_cmd(ev3.opSound + ev3.BREAK)
        if self._plays:
            self._plays = False
            self.change_color(ev3.LED_GREEN)
      
We modify method song:

    def song(self, song: dict) -> task.Task:
        tones = task.concat(
            task.Task(self._init_tone),
            task.Repeated(
                self._next_tone,
                args=(song,)
            ),
            task.Task(self.stop)
        )
        colors = task.Periodic(
            60 * song["beats_per_bar"] / song["tempo"],
            self._next_color,
            args=(song,)
        )
        if "upbeat" in song:
            colors = task.concat(
                task.Sleep(60 * song["upbeat"] / song["tempo"]),
                colors
            )
        colors = task.concat(
            task.Task(self._init_color),
            colors
        )
        return task.concat(
            task.Task(tones.start),
            task.Task(colors.start),
            task.Task(tones.join)
        )
      
There is a little trick in this code. Up to now, we can't stop a Task object. Here we need to stop task colors when task tones is finished. This is done by method stop, which sets attribute _plays = False. Attribute _plays signals method _next_color to return value True. This isn't good style and we will simplify the code, when our tasks can be stopped. For the moment we are happy with the solution and we test it:

#!/usr/bin/env python3

import task, ev3, ev3_sound_tmp as ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1
task.concat(
    jukebox.song(ev3_sound.HAPPY_BIRTHDAY),
    task.Sleep(2),
    jukebox.song(ev3_sound.TRIAS)
).start()
      

Last Test

As the last test of this lesson, we pack a Periodic into a Periodic into a Periodic:


#!/usr/bin/env python3

import task, ev3, ev3_sound

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t_inner = task.Periodic(0.25, jukebox.play_tone, args=("c", 0.1), duration=1, num=4)
t_middle = task.Periodic(2, t_inner.start, num=2)
t_outer = task.Periodic(2, t_middle.start, num=2, netto_time=True)
t_outer.start()
      
its output:

12:54:55.427018 Sent 0x|0D:00|2A:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:55.678632 Sent 0x|0D:00|2B:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:55.928161 Sent 0x|0D:00|2C:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:56.177827 Sent 0x|0D:00|2D:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:57.427841 Sent 0x|0D:00|2E:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:57.678454 Sent 0x|0D:00|2F:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:57.928489 Sent 0x|0D:00|30:00|80|00:00|94:01:01:82:83:00:81:64|
12:54:58.178475 Sent 0x|0D:00|31:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:00.430093 Sent 0x|0D:00|32:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:00.680987 Sent 0x|0D:00|33:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:00.930972 Sent 0x|0D:00|34:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:01.181053 Sent 0x|0D:00|35:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:02.430725 Sent 0x|0D:00|36:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:02.681313 Sent 0x|0D:00|37:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:02.931478 Sent 0x|0D:00|38:00|80|00:00|94:01:01:82:83:00:81:64|
12:55:03.181468 Sent 0x|0D:00|39:00|80|00:00|94:01:01:82:83:00:81:64|
      
Great, it works! Maybe you get the feeling, that using tasks will result in perls motto There's more than one way to do it. Yes I think so and I'm a bit concerned about it. But for the moment I see this freedom as a chance.

At the beginning of this lesson, we compared class Task and class Thread. We did not see any real advantage of class Task. Then we developed it further and it became a usefull tool to organize tasks.

Conclusion

We coded a family of task classes with the following relatives:

  • Task: encapsulates a callable, which will be executed once.
    
        class Task(builtins.object)
         |  Uses multithreading for tasks or chains of tasks.
         |  In standard case it's an action, which is executed by a single callable.
         |  Subsequent tasks or chains of tasks can be added with method append().
         |  
         |  Methods defined here:
         |  
         |  __init__(self, action, args:tuple=(), kwargs:dict={}, duration:float=None, join:bool=False)
         |      action: callable object (f.i. a function)
         |      args: argument list of action
         |      kwargs: keyword arguments of action
         |      duration: duration of task (if action returns earlier, task will wait)
         |      join: flag if contained task will be joined
       
  • Repeated: encapsulates a callable that will run multiple times.
    
        class Repeated(Task)
         |  Organizes repeated actions with multithreading (control comes back immediately).
         |  think of task as:
         |      while True:
         |          gap = action(*args, **kwargs)
         |          if gap is False or gap is None:
         |              pass
         |          elif gap is True or gap == -1:
         |              break
         |          else:
         |              time.sleep(gap)
         |  
         |  Methods defined here:
         |  
         |  __init__(self, action, args:tuple=(), kwargs:dict={}, num:int=0, duration:float=0, netto_time:bool=False)
         |      action: callable object, which is repeatedly called (f.i. a function)
         |                If callable it must return a number, a bool or None:
         |                True, -1: end the loop
         |                False, None: next call directly follows (if not reached limit of num)
         |                positive number: time gap between the actual and the next call
         |      args: argument list of action
         |      kwargs: keyword arguments of action
         |      num: number of calls (0 stands for unlimited)
         |      duration: duration of task (if execution ends earlier, task will wait)
         |      netto_time: flag, that waiting is netto (execution of action counts extra)
       
  • Periodic: encapsulates a callable to run it multiple times with a fixed time-intervall.
    
        class Periodic(Task)
         |  Uses multithreading for periodic actions (control comes back immediately).
         |  think of task as:
         |      while not action(*args, **kwargs):
         |          time.sleep(intervall)
         |  
         |  Methods defined here:
         |  
         |  __init__(self, intervall:float, action, args:tuple=(), kwargs:dict={}, num:int=0, duration:float=0, netto_time:bool=False)
         |      intervall: intervall between two calls of action (in seconds)
         |      action: callable object, which is repeatedly called (f.i. a function)
         |          It returns a bool or None:
         |              True: end the loop
         |              False, None: next call will follow (if not reached limit of num)
         |      args: argument list of action
         |      kwargs: keyword arguments of action
         |      num: number of calls (0 stands for unlimited)
         |      duration: duration of task (if execution ends earlier, task will wait)
         |      netto_time: flag, that waiting is netto (execution of action counts extra)
       
  • Sleep: sleeps
    
        class Sleep(Task)
         |  Sleeps
         |  
         |  Methods defined here:
         |  
         |  __init__(self, seconds:float)
         |      seconds: duration of sleeping
       
All of them are instances of class Task and can be combined (in any order) to build chains of tasks. Behind the scenes, they use multithreading, which allows starting parallel tasks (even inside a task one can start parallel tasks). All tasks can be parametrized for exact timing. This helps for a high level of time control. Locking also is done in the background. Only for seldom cases one needs to think about the locking mechanism.

task objects have the following methods:


     |  append(self, task) -> 'Task'
     |      appends a task or a chain of tasks (both must be root tasks)
     |  
     |  join(self) -> None
     |      joins the thread of the task 
     |      think as: my_task.thread.join(), but evaluated when called
     |  
     |  start(self) -> 'Task'
     |      starts execution of task (finished tasks may be started again)
      
Tasks have an easy to handle API and hide the details of multithreading and locking but they are flexible to use. They are independent from the EV3 device and can be used for multiple kinds of software projects, where multithreading is needed.

Tasks are both, architecture and glue. They allow to code callable atoms and then put them together to tasks with complex functionality. The construction of the task from its atoms often needs the thinking of an architect, but all resulting Task objects have the very same simple API. The users of Task objects need no knowledge of its inner structure.

For the moment, tasks can't be stopped and continued and their error handling must be improved. These will be topics of our next lesson. We will extend the methods to:


     |  append(self, task) -> 'Task'
     |      appends a task or a chain of tasks (both must be root tasks)
     |  
     |  cont(self, gap:float=0) -> 'Task'
     |      continues a stopped task (must be a root task)
     |      gap: sets the waiting time before the next action occurs (in seconds)
     |  
     |  join(self) -> None
     |      joins the thread of the task 
     |      think as: my_task.thread.join(), but evaluated when called
     |  
     |  start(self, gap:float=0) -> 'Task'
     |      starts execution of task (finished or stopped tasks may be started again)
     |      gap: sets the waiting time, before start occurs (in seconds)
     |  
     |  stop(self) -> None
     |      stops execution as fast as possible
     |          allows to continue with method cont or restart with method start
     |          already finished tasks silently do nothing
      

Now time has come to play around with the new tools. Modify your programs The depressed giraffe and The dancing robot and use tasks. Be creative and realize some of your own ideas and develop a feeling for the task concept. When you finished your playing, come back to lesson 9.