Monday, 1 August 2016

Lesson 11 - EV3's Filesystem

EV3 Direct commands - Lesson 11

Introduction

This lesson is about the filesystem of the EV3 device, which is a Linux computer with a verly low security level. We start with a warning: Direct commands (and system commands) have read- and write-access to the whole filesystem. This says that they can damage the heart of your EV3 device and leave it in a state, where is does not boot nor execute anything. If you have a EV3 device in this damaged state, you need to boot from a SD-Card and then repair the filesystem.

The filesystem allows to put resources to the EV3 device and then to reference them in direct commands. Resources can be byte code, sound, graphic, icons or data. All of them expand the EV3's spectrum of actions. Sound files allow to program speaking robots. Graphic files can be used to give your robot a face and so on. You know from lesson 2 how to play sound files, to display graphic files and to start executable programs. This lesson tells how to copy them from your local host to the EV3 device or vice versa. We also learn to create and delete directories or remove files.

Operation opFile

Operation opFile has 32 different CMDs but some important are missed. We first look at some of the existing ones, then we talk about the deficit.

GET_FOLDERS

This command reads the number of subdirectories of a given directory.

  • opFile = 0x|C0| with CMD GET_FOLDERS = 0x|0D|:
    Arguments
    • (Data8) NAME: First character in folder name (Ex. “../prjs/”)

    Returns
    • (Data8) NUMBER: No of sub folders
I tested it with this program:

#!/usr/bin/env python3

import ev3, struct

my_ev3 = ev3.EV3(
    protocol=ev3.USB,
    host='00:16:53:42:2B:99'
)
my_ev3.verbosity = 1

directory = '/'
ops = b''.join([
    ev3.opFile,
    ev3.GET_FOLDERS,
    ev3.LCS(directory),
    ev3.GVX(0)
])
reply = my_ev3.send_direct_cmd(ops, global_mem=1)
num = struct.unpack('<B', reply[5:])[0]
print(
    "Directory '{}' has {} subdirectories".format(
        directory,
        num
    )
)
      
and got this output:

11:46:29.588485 Sent 0x|0B:00|2A:00|00|02:00|C0:0D:84:2F:00:60|
11:46:29.636312 Recv 0x|05:00|2A:00|02|10|
Directory '/' has 16 subdirectories
      
The root directory of my EV3 device has 16 subdirectories.

GET_SUBFOLDER_NAME

This command reads the name of a subdirectory. The subdirectory is qualified by a number.

  • opFile = 0x|C0| with CMD GET_SUBFOLDER_NAME = 0x|0F|:
    Arguments
    • (Data8) NAME: First character in folder name (Ex. “../prjs/”)
    • (Data8) ITEM: Sub folder index [1..ITEMS]
    • (Data8) LENGTH: Maximal string length to read

    Returns
    • (Data8) STRING: Fist character of folder name (Character string)
I modified the test program:

#!/usr/bin/env python3

import ev3, struct

my_ev3 = ev3.EV3(
    protocol=ev3.USB,
    host='00:16:53:42:2B:99'
)

directory = '/'
ops = b''.join([
    ev3.opFile,
    ev3.GET_FOLDERS,
    ev3.LCS(directory),
    ev3.GVX(0)
])
reply = my_ev3.send_direct_cmd(ops, global_mem=1)
num = struct.unpack('<B', reply[5:])[0]
print(
    "Directory '{}' has {} subdirectories:".format(
        directory,
        num
    )
)
for i in range(num):
    ops = b''.join([
        ev3.opFile,
        ev3.GET_SUBFOLDER_NAME,
        ev3.LCS(directory),
        ev3.LCX(i + 1),         # ITEM
        ev3.LCX(64),            # LENGTH
        ev3.GVX(0)              # NAME
    ])
    reply = my_ev3.send_direct_cmd(ops, global_mem=64)
    subdir = struct.unpack('64s', reply[5:])[0]
    subdir = subdir.split(b'\x00')[0]
    subdir = subdir.decode("utf8")
    print("  {}".format(subdir))
      
and got this output:

Directory '/' has 16 subdirectories:
  bin
  boot
  dev
  etc
  home
  lib
  linuxrc
  media
  mnt
  opt
  proc
  sbin
  sys
  tmp
  usr
  var
      
No doubt, it's Linux! When I looked into directory /home/ I found one subdirectory: root. Only one user on the system. Everything, my EV3 device does, is done with root permissions, even the execution of the direct commands!

OPEN_READ, READ_BYTES and CLOSE

A file handle is a 8-bit reference, that allows access to a file. Command OPEN_READ creates and returns a file handle and binds it to the file, command CLOSE ends the binding of a file handle and therefore ends the access to the file.

With this knowledge, the plan to read a file is:

  • OPEN_READ: Creates a file handle and returns the handle and the size of the file.
  • multiple READ_BYTES: Uses the file handle and reads partitions of data that fit into the global memory of a direct command.
  • CLOSE: closes the file.

This plan doesn't work! The file handle looses its binding when the direct command ends. We can read small files, when we combine an OPEN_READ and a READ_BYTES into a single direct command, but there is no chance to read large files. We read the heading 1014 bytes of file /bin/usb-devices:


#!/usr/bin/env python3

import ev3, struct

my_ev3 = ev3.EV3(
    protocol=ev3.USB,
    host='00:16:53:42:2B:99'
)

file_name = "/bin/usb-devices"
ops = b''.join([
    ev3.opFile,
    ev3.OPEN_READ,
    ev3.LCS(file_name), # NAME
    ev3.LVX(0),         # HANDLE
    ev3.GVX(0),         # SIZE
    ev3.opFile,
    ev3.READ_BYTES,
    ev3.LVX(0),         # HANDLE
    ev3.LCX(1014),      # BYTES
    ev3.GVX(4),         # DESTINATION
])

reply = my_ev3.send_direct_cmd(ops, local_mem=1, global_mem=1018)
(size, data) = struct.unpack('<I1014s', reply[5:])
data = data.decode("utf8")
print("File: {}, size: {} bytes".format(file_name, size))
print(data)
      
The output:

File: /bin/usb-devices, size: 4202 bytes
#!/bin/bash

# Copyright: 2009 Greg Kroah-Hartman <greg@kroah.com>
#            2009 Randy Dunlap <rdunlap@xenotime.net>
#            2009 Frans Pop <elendil@planet.nl>
#
# This software may be used and distributed according to the terms of
# the GNU General Public License (GPL), version 2, or at your option
# any later version.

print_string() {
 file=$1
 name=$2
 if [ -f $file ]; then
  echo "S:  $name=`cat $file`"
 fi
}

class_decode() {
 local class=$1  # v4: in hex

 case $class in
 "00") echo ">ifc " ;;
 "01") echo "audio" ;;
 "02") echo "commc" ;;
 "03") echo "HID  " ;;
 "05") echo "PID  " ;;
 "06") echo "still" ;;
 "07") echo "print" ;;
 "08") echo "stor." ;;
 "09") echo "hub  " ;;
 "0a") echo "data " ;;
 "0b") echo "scard" ;;
 "0d") echo "c-sec" ;;
 "0e") echo "video" ;;
 "0f") echo "perhc" ;;
 "dc") echo "diagd" ;;
 "e0") echo "wlcon" ;;
 "ef") echo "misc " ;;
 "fe") echo "app. " ;;
 "ff") echo "vend." ;;
 "*")  echo "unk. " ;;
 esac
}

print_endpoint() {
 local eppath=$1

 addr=`cat $eppa
      
It's a shell-script.

OPEN_WRITE, OPEN_APPEND, WRITE_BYTES

Loosing the handles binding is no problem for writing. We can start with an OPEN_WRITE and follow with OPEN_APPEND, but there is another problem. Let's look at operation WRITE_BYTES:

  • opFile = 0x|C0| with CMD WRITE_BYTES = 0x|1D|:
    Arguments
    • (Data8) HANDLE: file handle
    • (Data8) BYTES: Number of bytes to write
    • (Data8) SOURCE: First byte in byte stream to write
Argument SOURCE is an address of the local or global memory. This says we first have to write the data into the memory, then we can write it into a file. There is no operation to write a byte stream to the memory, we have to split it in small units (f.i. operation opMove32_32 allows to write 4 bytes to the memory).

Deficits of direct commands

We found the following deficits:

  • Reading large files is not possible because the direct command runs in its own environment and its file handles loose the reference when the execution of a direct command ends.
  • Writing large files is not effective because we can't upload large partitions of data.
  • There is no operation to read the files in a directory.

System Commands

There are 15 system commands (decribed in the document EV3 Communication Developer Kit) that complete the direct commands. F.i. the system commands BEGIN_UPLOAD and CONTINUE_UPLOAD allow to read large files. They correspond to the above decribed situation. The main difference is the persistence of the file handle. BEGIN_UPLOAD creates and returns a file handle, which keeps its binding until CONTINUE_UPLOAD comes to the files end.

There are some differences between direct commands and system commands:

  • A system commands consists of one single command. You can't chain system commands.
  • System commands don't use local or global memory, there is no header.
  • The structure of system commands is:
    • Length of the message (bytes 0, 1)
    • Message counter (bytes 2, 3)
    • Message type (byte 4): It may have the following two values:
      • SYSTEM_COMMAND_REPLY = 0x|01|
      • SYSTEM_COMMAND_NO_REPLY = 0x|81|
    • System command with arguments (starting at byte 5): max. 1,019 bytes
  • System commands don't use LCS, LCX, LVX, GVX. Their arguments are separated by position. Strings are zero terminated.
  • The structure of the reply is:
    • Length of the reply (bytes 0, 1)
    • Message counter (bytes 2, 3)
    • Reply type (byte 4):
      • SYSTEM_REPLY = 0x|03|: the system command was successfully operated
      • SYSTEM_REPLY_ERROR = 0x|05|: the system command ended with an error.
    • Code of system command (byte 5)
    • Reply status (byte 6): success, type of error, additional information
    • Data (starting at byte 7): The return values of the system command (max. 1,017 bytes).
System commands focus on the filesystem. Some of the commands correspond to operations of direct commands (f.i. DELETE_FILE is an alternative to operation opFile with CMD REMOVE).

Module ev3

We add new methods and data to class EV3 so that it can send system commands to the EV3 device and receive its replies:


class EV3:
    ...
    def send_system_cmd(self, cmd: bytes, reply: bool=True) -> bytes:
        if reply:
            cmd_type = _SYSTEM_COMMAND_REPLY
        else:
            cmd_type = _SYSTEM_COMMAND_NO_REPLY
        self._lock.acquire()
        if self._msg_cnt < 65535:
            self._msg_cnt += 1
        else:
            self._msg_cnt = 1
        msg_cnt = self._msg_cnt
        self._lock.release()
        cmd = b''.join([
            struct.pack('<hh', len(cmd) + 3, msg_cnt),
            cmd_type,
            cmd
        ])
        if self._verbosity >= 1:
            now = datetime.datetime.now().strftime('%H:%M:%S.%f')
            print(now + \
                  ' Sent 0x|' + \
                  ':'.join('{:02X}'.format(byte) for byte in cmd[0:2]) + '|' + \
                  ':'.join('{:02X}'.format(byte) for byte in cmd[2:4]) + '|' + \
                  ':'.join('{:02X}'.format(byte) for byte in cmd[4:5]) + '|' + \
                  ':'.join('{:02X}'.format(byte) for byte in cmd[5:]) + '|' \
            )
        # pylint: disable=no-member
        if self._protocol in [BLUETOOTH, WIFI]:
            self._socket.send(cmd)
        elif self._protocol is USB:
            self._device.write(_EP_OUT, cmd, 100)
        else:
            raise RuntimeError('No EV3 connected')
        # pylint: enable=no-member
        counter = cmd[2:4]
        if not reply:
            return counter
        else:
            reply = self._wait_for_system_reply(counter)
            return reply

    def _wait_for_system_reply(self, counter: bytes) -> bytes:
        self._lock.acquire()
        reply = self._get_foreign_reply(counter)
        if reply:
            self._lock.release()
            if reply[4:5] != _SYSTEM_REPLY:
                raise SysCmdError("error: {:02X}".format(reply[6]))
            return reply
        if self._protocol == BLUETOOTH:
            time.sleep(0.1)
        while True:
            # pylint: disable=no-member
            if self._protocol in [BLUETOOTH, WIFI]:
                reply = self._socket.recv(1024)
            else:
                reply = bytes(self._device.read(_EP_IN, 1024, 0))
            # pylint: enable=no-member
            len_data = struct.unpack('<H', reply[:2])[0] + 2
            reply_counter = reply[2:4]
            if self._verbosity >= 1:
                now = datetime.datetime.now().strftime('%H:%M:%S.%f')
                print(now + \
                      ' Recv 0x|' + \
                      ':'.join('{:02X}'.format(byte) for byte in reply[0:2]) + \
                      '|' + \
                      ':'.join('{:02X}'.format(byte) for byte in reply[2:4]) + \
                      '|' + \
                      ':'.join('{:02X}'.format(byte) for byte in reply[4:5]) + \
                      '|' + \
                      ':'.join('{:02X}'.format(byte) for byte in reply[5:6]) + \
                      '|' + \
                      ':'.join('{:02X}'.format(byte) for byte in reply[6:7]) + \
                      '|', end='')
                if len_data > 7:
                    dat = ':'.join('{:02X}'.format(byte) for byte in reply[7:len_data])
                    print(dat + '|')
                else:
                    print()
            if counter != reply_counter:
                self._put_foreign_reply(reply_counter, reply[:len_data])
            else:
                self._lock.release()
                if reply[4:5] != _SYSTEM_REPLY:
                    raise SysCmdError("system command replied error: {:02X}".format(reply[6]))
                return reply[:len_data]

...
_SYSTEM_COMMAND_REPLY     = b'\x01'
_SYSTEM_COMMAND_NO_REPLY  = b'\x81'

_SYSTEM_REPLY             = b'\x03'
_SYSTEM_REPLY_ERROR       = b'\x05'

_SYSTEM_REPLY_OK          = b'\x00'
_SYSTEM_UNKNOWN_HANDLE    = b'\x01'
_SYSTEM_HANDLE_NOT_READY  = b'\x02'
_SYSTEM_CORRUPT_FILE      = b'\x03'
_SYSTEM_NO_HANDLES_AVAILABLE = b'\x04'
_SYSTEM_NO_PERMISSION     = b'\x05'
_SYSTEM_ILLEGAL_PATH      = b'\x06'
_SYSTEM_FILE_EXITS        = b'\x07'
_SYSTEM_END_OF_FILE       = b'\x08'
_SYSTEM_SIZE_ERROR        = b'\x09'
_SYSTEM_UNKNOWN_ERROR     = b'\x0A'
_SYSTEM_ILLEGAL_FILENAME  = b'\x0B'
_SYSTEM_ILLEGAL_CONNECTION= b'\x0C'

BEGIN_DOWNLOAD            = b'\x92'
CONTINUE_DOWNLOAD         = b'\x93'
BEGIN_UPLOAD              = b'\x94'
CONTINUE_UPLOAD           = b'\x95'
BEGIN_GETFILE             = b'\x96'
CONTINUE_GETFILE          = b'\x97'
CLOSE_FILEHANDLE          = b'\x98'
LIST_FILES                = b'\x99'
CONTINUE_LIST_FILES       = b'\x9A'
CREATE_DIR                = b'\x9B'
DELETE_FILE               = b'\x9C'
LIST_OPEN_HANDLES         = b'\x9D'
WRITEMAILBOX              = b'\x9E'
BLUETOOTHPIN              = b'\x9F'
ENTERFWUPDATE             = b'\xA0'
...
      
Annotations:
  • Method send_system_cmd with its internal method _wait_for_system_reply are modifications of their counterparts for direct commands.
  • System commands and direct commands use the same message counter and and stack of foreign replies.
  • There is no usage of sync_mode. Instead method send_system_cmd has an argument reply.
  • Blutooth seems to have problems with fast asking for replies of system commands, therefore we wait.

Reading Files

System commands BEGIN_UPLOAD and CONTINUE_UPLOAD

Reading a large file from EV3's filesystem needs these two system commands:

  • BEGIN_UPLOAD = 0x|94|:
    Arguments
    • (Data16) SIZE: length of the first partition (in bytes, max. 1,012)
    • (DataX) NAME: zero-terminated path to the file (including the filename). If the path doesn't start with a backslash (f.i. ./... or ../... or ui/...), it's relative from /home/root/lms2012/sys/.

    Returns
    • (Data32) SIZE: size of the file (in bytes)
    • (Data8) HANDLE: file handle
    • (DataX) DATA: first partition of data
  • CONTINUE_UPLOAD = 0x|95|:
    Arguments
    • (Data8) HANDLE: file handle
    • (Data16) SIZE: length of the partition (in bytes, max. 1,016)

    Returns
    • (Data8) HANDLE: file handle
    • (DataX) DATA: partition of data

Class FileSystem

We add a new module ev3_file with a class FileSystem that reads the data from a file:


#!/usr/bin/env python3

import struct
import ev3

class FileSystem(ev3.EV3):
    def write_file(self, path: str, data: bytes) -> None:
        size = len(data)
        cmd = b''.join([
            ev3.BEGIN_DOWNLOAD,
            struct.pack('<I', size),      # SIZE
            str.encode(path) + b'\x00'    # NAME
        ])
        reply = self.send_system_cmd(cmd)
        handle = struct.unpack('B', reply[7:8])[0]
        rest = size
        while rest > 0:
            part_size = min(1017, rest)
            pos = size - rest
            fmt = 'B' + str(part_size) + 's'
            cmd = b''.join([
                ev3.CONTINUE_DOWNLOAD,
                struct.pack(fmt, handle, data[pos:pos+part_size]) # HANDLE, DATA
            ])
            self.send_system_cmd(cmd)
            rest -= part_size

    def read_file(self, path: str) -> bytes:
        cmd = b''.join([
            ev3.BEGIN_UPLOAD,
            struct.pack('<H', 1012),      # SIZE
            str.encode(path) + b'\x00'    # NAME
        ])
        reply = self.send_system_cmd(cmd)
        (size, handle) = struct.unpack('<IB', reply[7:12])
        part_size = min(1012, size)
        if part_size > 0:
            fmt = str(part_size) + 's'
            data = struct.unpack(fmt, reply[12:])[0]
        else:
            data = b''
        rest = size - part_size
        while rest > 0:
            part_size = min(1016, rest)
            cmd = b''.join([
                ev3.CONTINUE_UPLOAD,
                struct.pack('<BH', handle, part_size) # HANDLE, SIZE
            ])
            reply = self.send_system_cmd(cmd)
            fmt = 'B' + str(part_size) + 's'
            (handle, part) = struct.unpack(fmt, reply[7:])
            data += part
            rest -= part_size
            if rest <= 0 and reply[6:7] != ev3.SYSTEM_END_OF_FILE:
                raise SysCmdError("end of file not reached")
        return data
      

Test

This helps to read large files. We read file /bin/usb-devices with this program:


#!/usr/bin/env python3

import ev3, ev3_file, struct, hashlib

file_sys = ev3_file.FileSystem(
    protocol=ev3.USB,
    host='00:16:53:42:2B:99'
)
file_sys.verbosity = 1

path = "/bin/usb-devices"
data = file_sys.read_file(path)
print(data.decode("utf8"))
print("md5:", hashlib.md5(data).hexdigest().upper())
      
The output:

09:24:08.466537 Sent 0x|17:00|2A:00|01|94:F4:03:2F:62:69:6E:2F:75:73:62:2D:64:65:76:69:63:65:73:00|
09:24:08.473383 Recv 0x|FE:03|2A:00|03|94|00|6A:10:00:00:00:23:21:2F:62:69:6E:2F:62:61:73:68:0A:0A:...
09:24:08.476595 Sent 0x|07:00|2B:00|01|95:00:F8:03|
09:24:08.479475 Recv 0x|FE:03|2B:00|03|95|00|00:70:61:74:68:2F:62:45:6E:64:70:6F:69:6E:74:41:64:64:...

...

09:24:08.499552 Sent 0x|07:00|2E:00|01|95:00:8E:00|
09:24:08.501479 Recv 0x|94:00|2E:00|03|95|08|00:63:74:6F:72:79:20:2F:73:79:73:2F:62:75:73:20:64:6F:...
#!/bin/bash

# Copyright: 2009 Greg Kroah-Hartman <greg@kroah.com>
#            2009 Randy Dunlap <rdunlap@xenotime.net>
#            2009 Frans Pop <elendil@planet.nl>
#
# This software may be used and distributed according to the terms of
# the GNU General Public License (GPL), version 2, or at your option
# any later version.

print_string() {
 file=$1
 name=$2
 if [ -f $file ]; then
  echo "S:  $name=`cat $file`"
 fi
}

...

for device in /sys/bus/usb/devices/usb*
do
 print_device $device 0 0 0
done

md5: 5E78E1B8C0E1E8CB73FDED5DE384C000
      
The program red the whole file of 4,202 bytes (1,012 + 3 * 1,016 + 142 bytes) and the file handle was 0x|00|. I calculated the md5 hash of the file. Wait a little moment and you will realize, why.

Reading Directories

System commands LIST_FILES and CONTINUE_LIST_FILES

Reading a directory needs these two system commands:

  • LIST_FILES = 0x|99|:
    Arguments
    • (Data16) SIZE: length of the first partition (in bytes, max. 1,012)
    • (DataX) NAME: zero-terminated path to the directory (optionally including a final backslash). If the path doesn't start with a backslash (f.i. ./... or ../... or ui/...), it's relative from /home/root/lms2012/sys/.

    Returns
    • (Data32) SIZE: size of the directory data (in bytes)
    • (Data8) HANDLE: file handle
    • (DataX) DATA: first partition of data
  • CONTINUE_LIST_FILES = 0x|9A|:
    Arguments
    • (Data8) HANDLE: file handle
    • (Data16) SIZE: length of the partition (in bytes, max. 1,016)

    Returns
    • (Data8) HANDLE: file handle
    • (DataX) DATA: partition of data

The data consist of lines. Every line stands for a subdirectory or a file. Subdirectories are a single string, that ends with a backslash, the line of a file consists of three parts:

  • the md5 hash of the file (32 bytes),
  • the size of the file, a 4 bytes integer (big endian) written as a string (8 bytes),
  • the name of the file.
The parts are separated by a single blank.

The data of directory /bin:


zcat/
watch/
vi/
usleep/
5E78E1B8C0E1E8CB73FDED5DE384C000 0000106A usb-devices
uname/
15C768916AB69D5E49BA2B55984AB644 00008F64 umount.util-linux-ng
umount/

...
      
You can compare the md5 hash with the one, we calculated above. The md5 hash allows to control the correctness of the reading and writing.

Adding method list_dir to class FileSystem

We add a method to our class, that reads the content of a directory and returns a dictionary (which is JSON):


    def list_dir(self, path: str) -> dict:
        cmd = b''.join([
            ev3.LIST_FILES,
            struct.pack('<H', 1012),      # SIZE
            str.encode(path) + b'\x00'    # NAME
        ])
        reply = self.send_system_cmd(cmd)
        (size, handle) = struct.unpack('<IB', reply[7:12])
        part_size = min(1012, size)
        if part_size > 0:
            fmt = str(part_size) + 's'
            data = struct.unpack(fmt, reply[12:])[0]
        else:
            data = b''
        rest = size - part_size
        while rest > 0:
            part_size = min(1016, rest)
            cmd = b''.join([
                ev3.CONTINUE_LIST_FILES,
                struct.pack('<BH', handle, part_size) # HANDLE, SIZE
            ])
            reply = self.send_system_cmd(cmd)
            fmt = 'B' + str(part_size) + 's'
            (handle, part) = struct.unpack(fmt, reply[7:])
            data += part
            rest -= part_size
            if rest <= 0 and reply[6:7] != ev3.SYSTEM_END_OF_FILE:
                raise SysCmdError("end of file not reached")
        folders = []
        files = []
        for line in data.split(sep=b'\x0A'):
            if line == b'':
                pass
            elif line.endswith(b'\x2F'):
                folders.append(line.rstrip(b'\x2F').decode("utf8"))
            else:
                (md5, size_hex, name) = line.split(None, 2)
                size = int(size_hex, 16)
                files.append({
                    'md5': md5.decode("utf8"),
                    'size': size,
                    'name': name.decode("utf8")
                })
        return {'files': files, 'folders': folders}
 
Annotations:
  • The first part is very similar to method read_file.
  • The second part creates the dictionary, which is returned.
  • The dictionary has the following structure:
    
              {'folders': ['subfolder1', 'subfolder2', ...]
               'files': [{'size': 4202,
                          'name': 'usb-devices',
                          'md5': '5E78E1B8C0E1E8CB73FDED5DE384C000'}, ...]}
         
    folders and files are separated, all information about the files is included, but the size is an integer.

Test

We recursively read directory /home/ with this program:


#!/usr/bin/env python3

import ev3, ev3_file

my_ev3 = ev3_file.FileSystem(
    protocol=ev3.USB,
    host='00:16:53:42:2B:99'
)

def read_dir(path: str) -> None:
    print(path)
    try:
        content = my_ev3.list_dir(path)
    except ev3_file.SysCmdError:
        print("error in {}".format(path))
        return
    for file in content['files']:
        print(' '*2 + file['name'])
    for folder in content['folders']:
        if folder in  [
                '.',
                '..',
        ]:
            pass
        else:
            read_dir(path + folder + '/')
    
read_dir('/home/')
      
The output:

/home/
/home/root/
/home/root/lms2012/
  lms2012
/home/root/lms2012/tools/
/home/root/lms2012/tools/WiFi/
  icon.rgf
  WiFi.rbf
  GeneralAlarm.rsf
  Connect.rsf
  144x99_POP5.rgf
  144x82_POP4.rgf
  144x65_POP3.rgf
  144x48_POP2.rgf
  144x116_POP6.rgf

...

/home/root/lms2012/apps/Brick Program/CVS/
      
This is the part of the filesystem, where all your projects and their resources are stored. You will find your history with your EV3 and the dialog structure in these directories. For details: Folder Structure.

Writing Files

System commands BEGIN_DOWNLOAD and CONTINUE_DOWNLOAD

Writing data into a file on the EV3 needs two system commands:

  • BEGIN_DOWNLOAD = 0x|92|:
    Arguments
    • (Data32) SIZE: length of the file (in bytes)
    • (DataX) NAME: zero-terminated path to the file (optionally including a final backslash). If the path doesn't start with a backslash (f.i. ./... or ../... or ui/...), it's relative from /home/root/lms2012/sys/.

    Returns
    • (Data8) HANDLE: file handle
  • CONTINUE_DOWNLOAD = 0x|93|:
    Arguments
    • (Data8) HANDLE: file handle
    • (DataX) DATA: partition of data (max. 1,018)

    Returns
    • (Data8) HANDLE: file handle

Adding method write_file to class FileSystem

We add a method to our class, that writes data into a file. If the file exists, it replaces it, if not it creates directory and file.:


    def write_file(self, path: str, data: bytes) -> None:
        size = len(data)
        cmd = b''.join([
            ev3.BEGIN_DOWNLOAD,
            struct.pack('<I', size),      # SIZE
            str.encode(path) + b'\x00'    # NAME
        ])
        reply = self.send_system_cmd(cmd)
        handle = struct.unpack('B', reply[7:8])[0]
        rest = size
        while rest > 0:
            part_size = min(1017, rest)
            pos = size - rest
            fmt = 'B' + str(part_size) + 's'
            cmd = b''.join([
                ev3.CONTINUE_DOWNLOAD,
                struct.pack(fmt, handle, data[pos:pos+part_size]) # HANDLE, DATA
            ])
            self.send_system_cmd(cmd)
            rest -= part_size
 
Annotations:
  • BEGIN_DOWNLOAD tests, if the path starts with /home/root/lms2012/apps, /home/root/lms2012/prjs or /home/root/lms2012/tools. This helps for security aspects. If you need to write to another path, use direct commands (opFile with CMDs OPEN_WRITE, OPEN_APPEND, WRITE_BYTES).
  • The max. size of a partition is 1,017 instead of 1,018. When I tested the method with the full length, it wrote incomplete files.
  • When I used SYSTEM_COMMAND_NO_REPLY for CONTINUE_DOWNLOAD, I got problems (again with incomplete files), maybe USB is too fast.

Test

We write a new file, a copy of the sound file T-rex roar.rsf and we control the md5 hash:


#!/usr/bin/env python3

import ev3, ev3_file, hashlib

file_sys = ev3_file.FileSystem(
    protocol=ev3.USB,
    host='00:16:53:42:2B:99'
)

data = open("./T-rex roar.rsf", 'rb').read()
file_sys.write_file("/home/root/lms2012/apps/tmp/T-rex roar.rsf", data)
print(file_sys.list_dir("/home/root/lms2012/apps/tmp"))
print("md5:", hashlib.md5(data).hexdigest().upper())
      
The output:

{'files': [{'name': 'T-rex roar.rsf', 'md5': 'F4FA8456F6004859FFF34A5667ACD781', 'size': 15521}], 'folders': ['..', '.']}
md5: F4FA8456F6004859FFF34A5667ACD781
      
Correct md5 hash, writing is o.k.

Deleting Files and Directories

System command DELETE_FILE

Deleting a file is done by the system command:

  • DELETE_FILE = 0x|9C|:
    Arguments
    • (DataX) NAME: zero-terminated path to the file. If the path doesn't start with a backslash (f.i. ./... or ../... or ui/...), it's relative from /home/root/lms2012/sys/.

    Returns
    • None
This command also allows to delete empty directories.

Adding method del_file to class FileSystem

We add this method to our class:


    def del_file(self, path: str) -> None:
        cmd = b''.join([
            ev3.DELETE_FILE,
            str.encode(path) + b'\x00'    # NAME
        ])
        self.send_system_cmd(cmd)

    def del_dir(self, path: str, secure: bool=True) -> None:
        if secure:
            self.del_file(path)
        else:
            if path.endswith("/"):
                path = path[:-1]
            parent_path = path.rsplit("/", 1)[0] + "/"
            folder = path.rsplit("/", 1)[1]
            ops = b''.join([
                ev3.opFile,
                ev3.GET_FOLDERS,
                ev3.LCS(parent_path),
                ev3.GVX(0)
            ])
            reply = self.send_direct_cmd(ops, global_mem=1)
            num = struct.unpack('B', reply[5:])[0]
            found = False
            for i in range(num):
                ops = b''.join([
                    ev3.opFile,
                    ev3.GET_SUBFOLDER_NAME,
                    ev3.LCS(parent_path),
                    ev3.LCX(i + 1),         # ITEM
                    ev3.LCX(64),            # LENGTH
                    ev3.GVX(0)              # NAME
                ])
                reply = self.send_direct_cmd(ops, global_mem=64)
                subdir = struct.unpack('64s', reply[5:])[0]
                subdir = subdir.split(b'\x00')[0]
                subdir = subdir.decode("utf8")
                if subdir == folder:
                    found = True
                    ops = b''.join([
                        ev3.opFile,
                        ev3.DEL_SUBFOLDER,
                        ev3.LCS(parent_path), # NAME
                        ev3.LCX(i + 1)        # ITEM
                    ])
                    self.send_direct_cmd(ops)
                    break
            if not found:
                raise ev3.DirCmdError("Folder " + path + " doesn't exist")
      
Annotations:
  • del_dir with argument secure=False uses direct commands and allows to delete directories which are not empty.
  • The comparison shows, that system commands are comfortable alternatives to direct commands.

Test

We delete the file and directory, we have created:


#!/usr/bin/env python3

import ev3, ev3_file, hashlib

file_sys = ev3_file.FileSystem(
    protocol=ev3.USB,
    host='00:16:53:42:2B:99'
)

file_sys.del_file("/home/root/lms2012/apps/tmp/T-rex roar.rsf")
file_sys.del_dir("/home/root/lms2012/apps/tmp")
print(file_sys.list_dir("/home/root/lms2012/apps"))
      
The output:

{'folders': ['Motor Control', 'Port View', 'IR Control', 'Brick Program', '..', '.'], 'files': ...
      
App tmp disappeared, o.k.

Loading resources and using them

It's time to put the things together. We start a little program, that loads a sound file and an graphic file to the EV3, uses and deletes them:


#!/usr/bin/env python3

import ev3, ev3_file, ev3_sound, task

file_sys = ev3_file.FileSystem(
    protocol=ev3.USB,
    host='00:16:53:42:2B:99'
)
jukebox = ev3_sound.Jukebox(ev3_obj=file_sys)
jukebox.volume=100

path_app = "../apps/tmp"
sound_local = "./Thank you.rsf"
sound_ev3 = path_app + "/Thank you.rsf"
image_local = "./Smile.rgf"
image_ev3 = path_app + "/Smile.rgf"

def load_resources() -> None:
    data_sound = open(sound_local, 'rb').read()
    file_sys.write_file(sound_ev3, data_sound)
    data_image = open(image_local, 'rb').read()
    file_sys.write_file(image_ev3, data_image)

def remove_resources() -> None:
    file_sys.del_dir(path_app, secure=False)

ops_smile = b''.join([
    ev3.opUI_Draw,
    ev3.BMPFILE,
    ev3.LCX(1),         # COLOR
    ev3.LCX(0),         # X0
    ev3.LCX(0),         # Y0
    ev3.LCS(image_ev3), # NAME
    ev3.opUI_Draw,
    ev3.UPDATE
])

ops_empty = b''.join([
    ev3.opUI_Draw,
    ev3.FILLWINDOW,
    ev3.LCX(0),         # COLOR
    ev3.LCX(0),         # Y0
    ev3.LCX(0),         # Y1
    ev3.opUI_Draw,
    ev3.UPDATE
])

t = task.concat(
    task.Task(
        load_resources,
        action_stop=remove_resources
    ),
    task.Task(
        file_sys.send_direct_cmd,
        args=(ops_smile,)
    ),
    jukebox.sound(sound_ev3.rsplit('.', 1)[0], duration=0.7),
    task.Task(
        file_sys.send_direct_cmd,
        args=(ops_empty,),
        duration=0.01
    ),
    task.Task(remove_resources)
)
t.start()
      
Remarks:
  • This sound file is part of the LEGO MINDSTORMS EV3 Home Edition. You find file Thank you.rsf in .../LEGO MINDSTORMS EV3 Home Edition/Resources/BrickResources/Retail/Sounds/files/Communication.
  • Dito the grahic file Smile.rgf. You find it in .../LEGO MINDSTORMS EV3 Home Edition/Resources/BrickResources/Retail/Images/files/Expressions.
  • We create a temporary folder /home/root/lms2012/apps/tmp, where we load the resources in.
  • This folder and its content are removed at the end of the task.
  • Stopping the task, also removes the temporary resources.

Conclusion

We coded another subclass of EV3 and added module ev3_file which has the following API:


Help on module ev3_file:

NAME
    ev3_file - access on EV3's filesystem

CLASSES
    builtins.Exception(builtins.BaseException)
        SysCmdError
    ev3.EV3(builtins.object)
        FileSystem
    
    class FileSystem(ev3.EV3)
     |  Works with EV3's filesystem
     |  
     |  Method resolution order:
     |      FileSystem
     |      ev3.EV3
     |      builtins.object
     |  
     |  Methods defined here:
     |  
     |  copy_file(self, path_source:str, path_dest:str) -> None
     |      Copies a file in the EV3's file system from
     |      its old location to a new one
     |      (no error if the file doesn't exist)
     |      
     |      Attributes:
     |      path_source: absolute or relative path (from "/home/root/lms2012/sys/") of the existing file
     |      path_dest: absolute or relative path of the new file
     |  
     |  create_dir(self, path:str) -> None
     |      Create a directory on EV3's file system
     |      
     |      Attributes:
     |      path: absolute or relative path (from "/home/root/lms2012/sys/")
     |  
     |  del_dir(self, path:str, secure:bool=True) -> None
     |      Delete a directory on EV3's file system
     |      
     |      Attributes:
     |      path: absolute or relative path (from "/home/root/lms2012/sys/")
     |      secure: flag, if the directory may be not empty
     |  
     |  del_file(self, path:str) -> None
     |      Delete a file from the EV3's file system
     |      
     |      Attributes:
     |      path: absolute or relative path (from "/home/root/lms2012/sys/") of the file
     |  
     |  list_dir(self, path:str) -> dict
     |      Read one of EV3's directories
     |      
     |      Attributes:
     |      path: absolute or relative path to the directory (f.i. "/bin")
     |      
     |      Returns:
     |      dict, that holds subfolders and files.
     |        folders as an array of strings (names)
     |        files as an array of dictionaries
     |        {'folders': ['subfolder1', 'subfolder2', ...]
     |         'files': [{'size': 4202,
     |                    'name': 'usb-devices',
     |                    'md5': '5E78E1B8C0E1E8CB73FDED5DE384C000'}, ...]}
     |  
     |  read_file(self, path:str) -> bytes
     |      Read one of EV3's files
     |      
     |      Attributes:
     |      path: absolute or relative path to file (f.i. "/bin/sh")
     |  
     |  send_system_cmd(self, cmd:bytes, reply:bool=True) -> bytes
     |      Send a system command to the LEGO EV3
     |      
     |      Arguments:
     |      cmd: holds netto data only (cmd and arguments), the following fields are added:
     |        length: 2 bytes, little endian
     |        counter: 2 bytes, little endian
     |        type: 1 byte, SYSTEM_COMMAND_REPLY or SYSTEM_COMMAND_NO_REPLY
     |      
     |      Keywor Arguments:
     |      reply: flag if with reply
     |      
     |      Returns: 
     |        reply (in case of SYSTEM_COMMAND_NO_REPLY: counter)
     |  
     |  write_file(self, path:str, data:bytes) -> None
     |      Write a file to the EV3's file system
     |      
     |      Attributes:
     |      path: absolute or relative path (from "/home/root/lms2012/sys/") of the file
     |      data: data to write into the file
     |  
     |  ----------------------------------------------------------------------
     |  Methods inherited from ev3.EV3:
     |  
        ...
      

This allows to work with resources. We have picked a few of them from the LEGO MINDSTORMS EV3 Home Edition and it's obvious, that a large collection of resources allows great applications. Let's only think of a speaking robot with a real vocabulary.

If you need to convert grahic or sound to the format, the EV3 understands, take a look at ImageMagick to create graphic files or wavrsocvt to create sound files.

Friday, 10 June 2016

Lesson 10 - Self-driving vehicle

EV3 Direct commands - Lesson 08

Introduction

The last three lessons were about classes, which organize tasks. This lesson is another one about class TwoWheelVehicle, that represents vehicles with two drived wheels. We will make it compatible to the task concept At its end we will realize a vehicle, that follows a course and immediately stops, when its sensor detects a barrier and continues the driving, when the barrier disappeares. The whole algorithm of this self driving vehicle will be a task object.

This all will be done step by step. We train our vehicle to stop immediately and to continue correct. At the end we will construct tasks of a new level of complexity.

The task factory

We add some constants to class TwoWheelVehicle:


DRIVE_TYPE_STRAIGHT = "straight"
DRIVE_TYPE_TURN = "turn"
DRIVE_TYPE_ROTATE_TO = "rotate_to"
DRIVE_TYPE_DRIVE_TO = "drive_to"
DRIVE_TYPE_STOP = "stop"
      
We add a new method:

    def task_factory(self, drive_type: str, **kwargs) -> task.Task:
        speed = kwargs.pop('speed', None)
        distance = kwargs.pop('distance', None)
        radius_turn = kwargs.pop('radius_turn', None)
        angle = kwargs.pop('angle', None)
        right_turn = kwargs.pop('right_turn', False)
        orientation = kwargs.pop('orientation', None)
        pos_x = kwargs.pop('pos_x', None)
        pos_y = kwargs.pop('pos_y', None)
        brake = kwargs.pop('brake', False)
        exc = kwargs.pop('exc', None)
        if drive_type == DRIVE_TYPE_STRAIGHT:
            return task.Task(
                self.drive_straight,
                args=(speed, distance),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_TURN:
            return task.Task(
                self.drive_turn,
                args=(speed, radius_turn, angle, right_turn),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_ROTATE_TO:
            return task.Task(
                self.rotate_to,
                args=(speed, orientation),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_DRIVE_TO:
            return task.Task(
                self.drive_to,
                args=(speed, pos_x, pos_y),
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_STOP:
            return task.Task(
                self.stop,
                args=(brake,),
                exc=exc
            )
      
We test it:

#!/usr/bin/env python3

import ev3, ev3_vehicle, task, time

vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
speed = 25

drive = task.concat(
    vehicle.task_factory("turn", speed, radius_turn=0.25, angle=60),
    vehicle.task_factory("straight", speed, distance=0.2),
    vehicle.task_factory("rotate_to", speed, orientation=0),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop", brake=False)
)
drive.start()
      
It works, the vehicle does its three moves, then stops hard (with brake). drive.start() returns immediately, which allows to start something parallel. But what happens, when we call method stop? Task objects don't know, that vehicles need an explicit call of method stop.

Stopping a vehicle

We change method task_factory:


    def task_factory(self, drive_type: str, **kwargs) -> task.Task:
        speed = kwargs.pop('speed', None)
        distance = kwargs.pop('distance', None)
        radius_turn = kwargs.pop('radius_turn', None)
        angle = kwargs.pop('angle', None)
        right_turn = kwargs.pop('right_turn', False)
        orientation = kwargs.pop('orientation', None)
        pos_x = kwargs.pop('pos_x', None)
        pos_y = kwargs.pop('pos_y', None)
        brake = kwargs.pop('brake', False)
        exc = kwargs.pop('exc', None)
        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = task.Task(
                self.drive_straight,
                args=(speed, distance),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_TURN:
            t = task.Task(
                self.drive_turn,
                args=(speed, radius_turn, angle, right_turn),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_ROTATE_TO:
            t = task.Task(
                self.rotate_to,
                args=(speed, orientation),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_DRIVE_TO:
            t = task.Task(
                self.drive_to,
                args=(speed, pos_x, pos_y),
                action_stop=self.stop,
                exc=exc
            )
        elif drive_type == DRIVE_TYPE_STOP:
            t = task.Task(
                self.stop,
                args=(brake,),
                exc=exc
            )
        return task.Task(t.start, join=True, exc=exc)
      
This makes all movements to contained tasks. They are root tasks and will stay root tasks, even when the returned task object is appended to another task object. We already discussed it in lesson 9.

We modify the test program:


#!/usr/bin/env python3

import ev3, ev3_vehicle, task, time

vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
speed = 25

drive = task.concat(
    vehicle.task_factory("turn", speed, radius_turn=0.25, angle=60),
    vehicle.task_factory("straight", speed, distance=0.2),
    vehicle.task_factory("rotate_to", speed, orientation=0),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop", brake=False)
)
drive.start()
time.sleep(1)
drive.stop()
      
It turns, then stops, this is, what we expected. The call of method stop immediately returns, but the movement lasts longer. This type of stopping is not what we target. The perfect stopping reacts prompt.

Fast stopping of drive_straight

Actually, the smallest unit of our tasks are the methods drive_straight, drive_turn, rotate_to and drive_to. If we want a fast stopping, we need to split them up into smaller units. F.i. we can split up drive_straight into a chain of tasks, which consits of a Task object and a Repeated object. This allows to stop while Repeated executes with the option to continue it. First we take a look into the actual code of method drive_straight:


    def drive_straight(self, speed: int, distance: float) -> None:
        ...
        self.move_straight(speed)
        step = round(distance * 360 / (2 * math.pi * self._radius_wheel))
        direction = math.copysign(1, speed * self._polarity)
        pos_final = [self._pos[0] + direction * step,
                     self._pos[1] + direction * step]
        while True:
            value = self._test_pos(direction, pos_final)
            if value == -1:
                break
            time.sleep(value)
      
It seems easy to realize the lower part with a Repeated object. We will code a chain of tasks, the first part will be a Task object, the second a Repeated. But we need to solve the problem with the arguments of method _test_pos. They are not known, when the Repeated task is constructed. Therefore we add a new attribute to class TwoWheelVehicle:

    def __init__(self, radius_wheel: float, tread: float,
                 protocol: str=None,
                 host: str=None,
                 ev3_obj: ev3.EV3=None):
        super().__init__(protocol=protocol,
                         host=host,
                         ev3_obj=ev3_obj)
        ...
        self._test_args = None
      

We change method _test_pos from:


    def _test_pos(
            self,
            direction,
            pos_final: list
    ) -> float:
        if self._to_stop:
        ...
      
to:

    def _test_pos(self) -> float:
        (direction, pos_final) = self._test_args
        if self._to_stop:
        ...
      
and method drive_straight to:

    def drive_straight(self, speed: int, distance: float) -> None:
        ...
        self._test_pos_args = (direction, pos_final)
        while True:
            value = self._test_pos()
            if value == -1:
                break
            time.sleep(value)
      
This solves the problem with the arguments! Now we can write a new method _drive_straight:

    def _drive_straight(self, speed: int, distance: float) -> None:
        self.move_straight(speed)
        step = round(distance * 360 / (2 * math.pi * self._radius_wheel))
        direction = math.copysign(1, speed * self._polarity)
        pos_final = [self._pos[0] + direction * step,
                     self._pos[1] + direction * step]
        self._test_pos_args = (direction, pos_final)
      
and we modify method task_factory:

    def task_factory(
            ...
    ) -> task.Task:
        ...
        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = task.Task(
                self._drive_straight,
                args=(speed, distance),
                action_stop=self.stop,
                exc=exc
            )
            if distance != None:
                t.append(task.Repeated(self._test_pos))
        ...
      
The first part, the Task object, calls method _drive_straight, which starts the movement and calculates the arguments of method _test_pos. The second part is a Repeated, that ends, when the final position is reached.

We test the new code with this program:


#!/usr/bin/env python3

import ev3, ev3_vehicle, task, time

vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
vehicle.verbosity = 1
speed = 25

drive = vehicle.task_factory("straight", speed, distance=0.5)
drive.start()
time.sleep(1)
drive.stop()
      
its output:

21:22:47.812922 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:47.860821 Recv 0x|0B:00|2A:00|02|B0:0C:00:00:C0:0C:00:00|
21:22:47.962883 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.004716 Recv 0x|0B:00|2B:00|02|CC:0C:00:00:DC:0C:00:00|
21:22:48.250600 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.300776 Recv 0x|0B:00|2C:00|02|17:0D:00:00:27:0D:00:00|
21:22:48.814450 Sent 0x|19:00|2D:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
21:22:48.876683 Recv 0x|0B:00|2D:00|02|AC:0D:00:00:BB:0D:00:00|
      
The stopping occured one second after the start of the movement, not when the whole distance was moved. Fast stopping is realized!

Continuing drive_straight

When we call method stop, we stop the motors. We have to restart them, when we continue. This is, what the task's argument action_cont is thought for. If you look inside method _test_pos, you will realize a second problem. This method calculates the remaining time from the time gap since its last call. Stopping and continuing will disturb this algorithm. Therefore continuation needs to act like the start of a new movement. We add a new attribute to class TwoWheelVehicle:


    def __init__(self, radius_wheel: float, tread: float,
                 protocol: str=None,
                 host: str=None,
                 ev3_obj: ev3.EV3=None):
        ...
        self._turn = None
        self._speed = None
        ...
      

We add a line to method move:


    def move(self, speed: int, turn: int) -> None:
        ...
        self._turn = turn
        self._speed = speed
        self._moves = True
      

We add the following method:


    def _vehicle_cont(self):
        self.move(self._speed, self._turn)
        self._to_stop = False
        self._last_t = None
      
Setting self._last_t = None demands the loop to act like a new movement.

We modify method task_factory:


        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = task.Task(
                self._drive_straight,
                args=(speed, distance),
                action_stop=self.stop,
                action_cont=self._vehicle_cont,
                exc=exc
            )
            if distance != None:
                t.append(task.Repeated(self._test_pos))
      
This will call method _vehicle_cont when the task continues.

We change method drive_straight to start a task and join it:


    def drive_straight(self, speed: int, distance: float) -> None:
        self.task_factory(
            DRIVE_TYPE_STRAIGHT,
            speed,
            distance=distance
        ).start().join()
      
This helps to prevent double code. We started with a factory that called method drive_straight. Now we end with the opposite situation, method drive_straight calls the factory!

We test the continuation with this program:


#!/usr/bin/env python3

import ev3, ev3_vehicle, task, time, datetime

vehicle = ev3_vehicle.TwoWheelVehicle(
    0.02128,                 # radius_wheel
    0.1175,                  # tread
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
vehicle.verbosity = 1

speed = 25
drive = task.concat(
    vehicle.task_factory("straight", speed=speed, distance=0.5),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop")
)

drive.start()
time.sleep(3)
drive.stop()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "*** called stop ***")
time.sleep(2)
drive.cont()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "*** called cont ***")
      
As you can see, the program consits of three parts:
  • creating and administrating class TwoWheelVehicle,
  • constructing task drive (a chain of tasks),
  • opearating task drive (needs no knowledge of the tasks inner structure).

its output:


09:17:04.074805 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.124579 Recv 0x|0B:00|2A:00|02|78:0F:00:00:7B:0F:00:00|
09:17:04.226509 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.294573 Recv 0x|0B:00|2B:00|02|9D:0F:00:00:A0:0F:00:00|
09:17:04.566396 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:04.630528 Recv 0x|0B:00|2C:00|02|F4:0F:00:00:F5:0F:00:00|
09:17:05.237855 Sent 0x|15:00|2D:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:05.313547 Recv 0x|0B:00|2D:00|02|9F:10:00:00:A2:10:00:00|
09:17:06.604481 Sent 0x|15:00|2E:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:06.671509 Recv 0x|0B:00|2E:00|02|FA:11:00:00:FD:11:00:00|
09:17:07.077801 *** called stop ***
09:17:07.078564 Sent 0x|19:00|2F:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:07.286495 Recv 0x|0B:00|2F:00|02|97:12:00:00:9A:12:00:00|
09:17:09.081348 *** called cont ***
09:17:10.608654 Sent 0x|1F:00|30:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:10.667424 Recv 0x|0B:00|30:00|02|C4:12:00:00:C6:12:00:00|
09:17:10.769170 Sent 0x|15:00|31:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:10.812384 Recv 0x|0B:00|31:00|02|E4:12:00:00:E5:12:00:00|
09:17:11.058759 Sent 0x|15:00|32:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.015337 Recv 0x|0B:00|32:00|02|17:14:00:00:18:14:00:00|
09:17:12.017042 Sent 0x|19:00|33:00|00|08:00|A3:00:09:01:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.060297 Recv 0x|0B:00|33:00|02|21:14:00:00:22:14:00:00|
09:17:12.563204 Sent 0x|19:00|34:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:17:12.613220 Recv 0x|0B:00|34:00|02|21:14:00:00:22:14:00:00|
      
At a first look it works as designed! When continued, it again has small time gaps between the readings of the wheels positions, which then grow. A closer look at the times shows, that the continuation doesn't occur 2 sec. after the stopping. There is a gap of about 1.5 sec. between the call of method cont and the sending of the direct command, that continues the movement. This is worth to analyse.

Subclassing Task

We remember how Task objects are contined. Attribute _time_action holds the time of the next execution of their action. Method _cont2 determines the time of the next action of the continued task itself and all its contained tasks. When calling method _cont2 of the contained tasks it sets argument time_delta so that the old synchronization is reconstructed. This was fine for most situations but method sound of class Jukebox subclassed Task to realize the correct timing. In our situation, once more the standatrd is not what we need. Our continuation of the Repeated object is more the start of a new movement. This says, we again have to manipulate attribute _time_action to get the correct result. We do it the following way:


    def task_factory(self, drive_type: str, **kwargs) -> task.Task:
        ...
        class _Drive(task.Task):
            def stop(self):
                super().stop()
                self._time_action = time.time()

        if drive_type == DRIVE_TYPE_STRAIGHT:
            t = _Drive(
                    self._drive_straight,
                    args=(speed, distance),
                    action_stop=self.stop,
                    action_cont=self._vehicle_cont,
                    exc=exc
            )
            if distance != None:
                t.append(task.Repeated(self._test_pos))
        ...
      
Class _Drive is a subclass of task.Task. We modify the root task and not task.Repeated because method stop always is called via the root task. The only modification of _Drive objects is that their method stop sets _time_action to the actual time. I repeated the test and got this output:

09:45:54.792138 Sent 0x|1F:00|2A:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:54.837971 Recv 0x|0B:00|2A:00|02|21:14:00:00:22:14:00:00|
09:45:54.939915 Sent 0x|15:00|2B:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:54.983938 Recv 0x|0B:00|2B:00|02|40:14:00:00:40:14:00:00|
09:45:55.231559 Sent 0x|15:00|2C:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:55.309953 Recv 0x|0B:00|2C:00|02|8A:14:00:00:8B:14:00:00|
09:45:55.883439 Sent 0x|15:00|2D:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:55.936879 Recv 0x|0B:00|2D:00|02|35:15:00:00:36:15:00:00|
09:45:57.137283 Sent 0x|15:00|2E:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:57.186836 Recv 0x|0B:00|2E:00|02|74:16:00:00:74:16:00:00|
09:45:57.795134 *** called stop ***
09:45:57.795901 Sent 0x|19:00|2F:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:57.856792 Recv 0x|0B:00|2F:00|02|20:17:00:00:20:17:00:00|
09:45:59.798555 *** called cont ***
09:45:59.862278 Sent 0x|1F:00|30:00|00|08:00|B0:00:09:19:00:00:00:A6:00:09:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:45:59.917685 Recv 0x|0B:00|30:00|02|3E:17:00:00:3F:17:00:00|
09:46:00.019364 Sent 0x|15:00|31:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:00.396673 Recv 0x|0B:00|31:00|02|AE:17:00:00:AF:17:00:00|
09:46:00.977447 Sent 0x|15:00|32:00|00|08:00|99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:01.392575 Recv 0x|0B:00|32:00|02|AF:18:00:00:AF:18:00:00|
09:46:01.635565 Sent 0x|19:00|33:00|00|08:00|A3:00:09:01:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:01.693532 Recv 0x|0B:00|33:00|02|FB:18:00:00:FC:18:00:00|
09:46:02.196212 Sent 0x|19:00|34:00|00|08:00|A3:00:09:00:99:1C:00:13:07:01:01:60:99:1C:00:10:07:00:01:64|
09:46:02.428546 Recv 0x|0B:00|34:00|02|FB:18:00:00:FB:18:00:00|
      
This is what I hoped to see!

Method drive_turn

I think, it's straight forward to code the changes of methods drive_turn, test_o and task_factory. At the end, it behaves like method drive_straight.

Method rotate_to

Actually method rotate_to calls method drive_turn. We have to change that. Once again we don't know the arguments, when the task is constructed. We code a new method _rotate_to:


    def _rotate_to(self, speed: int, orientation: float) -> None:
        diff = orientation - self._orientation
        diff += 180
        diff %= 360
        diff -= 180
        if diff >= 0:
            right_turn = False
            direction = 1
        else:
            right_turn = True
            direction = -1
        if abs(diff) >= 1:
            o_orig = self._orientation
            self.move_turn(speed, radius_turn=0, right_turn=right_turn)
            if diff > 0 and self._orientation > o_orig + diff:
                diff += 360
            elif diff < 0 and self._orientation < o_orig + diff:
                diff -= 360
        delta_pos = self._polarity * diff * 0.5 * self._tread / self._radius_wheel
        final_pos = [self._pos[0] - delta_pos,
                     self._pos[1] + delta_pos]
        self._test_args = (direction, o_orig + diff, final_pos)
     
You see the analogy with method _drive_straight. We start the movement and we calculate the arguments of method _test_o.

We modify method task_factory:


        ...
        elif drive_type == DRIVE_TYPE_ROTATE_TO:
            t = task.concat(
                _Drive(
                    self._rotate_to,
                    args=(speed, orientation),
                    action_stop=self.stop,
                    action_cont=self._vehicle_cont,
                    exc=exc
                ),
                task.Repeated(self._test_o)
            )
        ...
      

And we let method rotate_to call the task factory:


    def rotate_to(self, speed: int, orientation: float) -> None:
        self.task_factory(
            DRIVE_TYPE_ROTATE_TO,
            speed,
            orientation=orientation
        ).start().join()
      

Method drive_to

Method drive_to combines a turn with a straight movement. Again we don't know the arguments, which says we can't call methods rotate_to or drive_straight, but we will call _rotate_to and _drive_straight. Altogether we construct a chain of tasks with four links. We modify method task_factory:


 ...
        elif drive_type == DRIVE_TYPE_DRIVE_TO:
            t = task.concat(
                _Drive(
                    self._drive_to_1,
                    args=(speed, pos_x, pos_y),
                    action_stop=self.stop,
                    action_cont=self._vehicle_cont,
                    exc=exc,
                ),
                task.Repeated(self._test_o),
                task.Task(
                    self._drive_to_2,
                    args=(speed, pos_x, pos_y)
                ),
                task.Repeated(self._test_pos)
            )
        ...
      
Method _drive_to_1 will start the turn movement and calculate the final orientation. Method _drive_to_2 starts a straight movement and calculates the final position of the wheels.

We code the new method _drive_to_1:


    def _drive_to_1(
            self,
            speed: int,
            pos_x: float,
            pos_y: float
    ) -> None:
        diff_x = pos_x - self._pos_x
        diff_y = pos_y - self._pos_y
        dist = math.sqrt(diff_x**2 + diff_y**2)
        if abs(diff_x) > abs(diff_y):
            direct = math.degrees(math.atan(diff_y/diff_x))
        else:
            fract = diff_x / diff_y
            sign = math.copysign(1.0, fract)
            direct = sign * 90 - math.degrees(math.atan(fract))
        if diff_x < 0:
            direct += 180
        self._rotate_to(speed, direct)
      

And a new method _drive_to_2:


    def _drive_to_2(
            self,
            speed: int,
            pos_x: float,
            pos_y: float
    ) -> None:
        diff_x = pos_x - self._pos_x
        diff_y = pos_y - self._pos_y
        dist = math.sqrt(diff_x**2 + diff_y**2)
        self._drive_straight(speed, dist)
      

Again method drive_to calls the factory:


    def drive_to(
            self,
            speed: int,
            pos_x: float,
            pos_y: float
    ) -> None:
        self.task_factory(
            DRIVE_TYPE_DRIVE_TO,
            speed,
            pos_x=pos_x,
            pos_y=pos_y
        ).start().join()
      
This was it! All the movements of the vehicle stop prompt and can be continued.

Self driving vehicles

We use the Follow Me vehicle (lesson 5) with its infrared sensor. We want the vehicle to follow a course, but stop, when it detects a barrier. When the obstacle disappeares, the vehicle again follows its course (two times the shape of the digit eight). This is not, what Google or the car manufacturers mean by self driving, but it is what we can achieve with about 80 lines of code.


#!/usr/bin/env python3

import ev3, ev3_vehicle, task, struct
sensor = ev3.EV3(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
vehicle = ev3_vehicle.TwoWheelVehicle(0.02128, 0.1175, ev3_obj=sensor)

speed = 50
sensor_gap = 0.1

def look_forward(cont: bool=False):
    ops = b''.join([
        ev3.opInput_Device,
        ev3.READY_RAW,
        ev3.LCX(0),          # LAYER
        ev3.LCX(1),          # NO
        ev3.LCX(33),         # TYPE - EV3-IR
        ev3.LCX(0),          # MODE - Proximity
        ev3.LCX(1),          # VALUES
        ev3.GVX(0)           # VALUE1
    ])
    reply = sensor.send_direct_cmd(ops, global_mem=4)
    dist = struct.unpack('<i', reply[5:])[0]
    if dist < 10 and not cont:
        return True
    elif dist >= 10 and cont:
        return True

t_drive = task.concat(
    task.Repeated(
        task.concat(
            vehicle.task_factory(
                "turn",
                speed=speed,
                radius_turn=0.25,
                angle=360
            ),
            vehicle.task_factory(
                "turn",
                speed=speed,
                radius_turn=-0.25,
                angle=360
            )
        ).start,
        num=2
    ),
    vehicle.task_factory("stop", brake=True),
    task.Sleep(0.5),
    vehicle.task_factory("stop")
)
t_controller = task.concat(
    task.Task(t_drive.start),
    task.Periodic(
        sensor_gap,
        look_forward
    ),
    task.Task(t_drive.stop),
    task.Repeated(
        task.concat(
            task.Periodic(
                sensor_gap,
                look_forward,
                kwargs={"cont": True}
            ),
            task.Task(t_drive.cont, args=(0,)),
            task.Periodic(
                sensor_gap,
                look_forward
            ),
            task.Task(t_drive.stop)
        ).start
    )
)
t_drive = task.concat(
    t_drive,
    task.Task(t_controller.stop)
)
t_controller.start()
      
Again we see this checks and balances behaviour of two tasks. Task t_controller starts, stops and continues task t_drive, but it is t_drive that finally stops t_controller, when the course ends.

The interesting part is task t_controller. This happens with task t_drive: start, stop, then an unlimited number of cont and stop. It's important, that the iteration of Repeated ends with stop. Let's look at an alternative formulation:


t_controller = task.concat(
    task.Task(t_drive.start),
    task.Repeated(
        task.concat(
            task.Periodic(
                sensor_gap,
                look_forward
            ),
            task.Task(t_drive.stop),
            task.Periodic(
                sensor_gap,
                look_forward,
                kwargs={"cont": True}
            ),
            task.Task(t_drive.cont, args=(0,))
        ).start
    )
)
 
This version stops and continues once and never again. It's because the contained task t_drive.cont is joined until the driving ends and this prevents the next iteration of the surrounding Repeated.

Conclusion

The self driving vehicle demonstrates a mechanism of regulation by a controller, that reads sensor values. The task concept has its own coding patterns and it needs some experience to get familiar with it. Coding tasks is not simple, but it helps for an incremental working, where complexity grows step by step. When the construction of the tasks is done, they hide the compexity behind an API which always keeps its simple surface to the outside world.

Recursion makes stopping and continuation to impressive methods. The optional arguments action_stop and action_cont allow even more precision, but we have seen that sometimes we need subclassing.

Both, the family of tasks (Task, Repeated, Periodic and Sleep) and class TwoWheelVehicle have reached a quality, which allows to realize complex programs. I hope you have some ideas. It would be great to hear from.

Friday, 27 May 2016

Lesson 9 - Stop and Continue

EV3 Direct commands - Lesson 09

Introduction

We are on the way, to write classes, that encapsulate an unlimited number of (related) actions into tasks objects. All tasks own the same easy handling which is independent of the type of actions. Moving a robot, reading sensor values, playing tones etc., all these procedures can be task objects with the methods: start, stop, cont (continue) and join:


     |  cont(self, gap:float=None) -> 'Task'
     |      continues a stopped task (must be a root task)
     |      
     |      Keyword Arguments:
     |      gap: sets the waiting time before the next action occurs (in seconds)
     |  
     |  join(self) -> None
     |      joins the thread of the task
     |  
     |  start(self, gap:float=0) -> 'Task'
     |      starts execution of task (finished or stopped tasks may be started again)
     |      
     |      Keyword Arguments:
     |      gap: sets the waiting time, before start occurs (in seconds)
     |  
     |  stop(self) -> None
     |      stops execution as fast as possible
     |          allows to continue with method cont or restart with method start
     |          already finished tasks silently do nothing
      
This makes the usage of task objects simple and allows to think about their dependencies instead of fighting with technical details. Task objects use multithreading. This allows to start parallel tasks or even construct tasks, which inside operate parallel actions. The creative part is the construction of task objects and this can be done behind the scenes. The outside world handles task objects, which are ready to use. The subclasses of class EV3 will produce them. This says, at the end, our class TwoWheelVehicle will return tasks, that drive the vehicle. As a consequence the vehicles movements can be stopped and continued and we can run other tasks parallel while a movement takes place.

But yet we are not users of task objects, we have to code them! Usage will be simple, coding will be not. This lesson has three topics:

  • Handling exceptions
  • Stopping tasks and preparing continuation
  • Continuing tasks
Again we use class Jukebox for tests.

Handling Exceptions

One of the open topics from last lesson is the error handling in task objects. We know from lesson 7, that throwing an exception will not influence foreign threads. In programs with multithreading, we need a central place of information about exceptions. If all our threads ask regularly if there has been an exception somewhere, they can react on it. We use a very simple mechanism, where the reaction is an exit.

The central place is a class, what else:


class ExceptionHandler:
    def __init__(self):
        self._exc = False

    def put(self, exc: Exception):
        self._exc = True

    def fire(self):
        if self._exc: sys.exit(1)
      
Its API:

    class ExceptionHandler(builtins.object)
     |  Handles Exceptions of task objects
     |  If anywhere an exceptions occured and was put to the ExceptionHandler,
     |  any thread that uses the same instance of ExceptionHandler exits,
     |  when it calls its method fire
     |  
     |  Methods defined here:
     |  
     |  __init__(self)
     |      Initialize self.  See help(type(self)) for accurate signature.
     |  
     |  fire(self)
     |      fires sys.exit(1) if an exception occured, else does nothing
     |  
     |  put(self, exc:Exception)
     |      informs, that an exception occured
     |      
     |      Arguments:
     |      exc: Exception, ignored, but subclasses may distinguish
      
Method put has a parameter exc, which is never used. This is for future subclasses, which may distinguish between different types of exceptions.

How do we use this class? Let's look at an example:


    def start(self) -> None:
        self._root._exc.fire()
        self._root._lock.acquire()
        try:
            assert self._root is self, 'only root tasks can be started'
            assert self._state in [
                STATE_INIT,
                STATE_STOPPED,
                STATE_FINISHED
            ], "can't start from state " + self._state
        except Exception as exc:
            self._root._exc.put(exc)
            self._root._lock.release()
            raise
        ...
      
Some remarks:
  • Attribute _exc holds the ExceptionHandler of the task.
  • method fire is called in an unlocked state. If the task would be locked, when the thread exits, this blocked other threads.
  • Raising the exception also occurs in an unlocked state.
  • The first thing, method start does, is asking if somewhere an exception occured. If so, its thread exits.
  • If one of the assertions throws an AssertionException, this is put to the tasks ExceptionHandler, then the exception is raised.

We formulate some rules:

  • Before locking (acquire the Lock object), always call method fire of the tasks ExceptionHandler.
  • Before raising an exception, it must be put to the tasks ExceptionHandler.
  • All raising of exceptions must occur in an unlocked state.
  • Use one ExceptionHandler for all your tasks. If not, this needs strong arguments.

As a consequence we add a class attribute to class Task:


class Task:
    _exc_default = ExceptionHandler()
      
This will be the default of all instances of class Task

We add some code to the constructor of Task objects:


    def __init__(self, action: typing.Callable, **kwargs):
        self._action = action
        self._args = kwargs.pop('args', ())
        self._kwargs = kwargs.pop('kwargs', {})
        self._join = kwargs.pop('join', False)
        self._duration = kwargs.pop('duration', None)
        self._num = kwargs.pop('num', 0)
        self._next = None
        self._root = self
        self._time_end = None
        self._netto_time = False
        self._cnt = 0
        # the following are root only attributes
        self._state = STATE_INIT
        self._thread = None
        self._lock = threading.Lock()
        self._cond = threading.Condition(self._lock)
        self._last = None
        self._activity = ACTIVITY_NONE
        self._time_action = None
        self._contained = []
        self._exc = kwargs.pop('exc', self._exc_default)
        self._exc.fire()
        assert not kwargs, 'unknown keyword arguments: ' + str(kwargs.keys())
      
The number of keyword arguments has grown to a limit, where I prefer to use a keyworded variable length argument list **kwargs.

The subclasses Periodic and Repeated also use **kwargs. F.i. class Periodic:


class Periodic(Task):
    def __init__(self, intervall: float, action: typing.Callable, **kwargs):
        self._intervall = intervall
        self._netto_time = kwargs.pop('netto_time', False)
        assert not kwargs['join'], "no keyword argument 'join' for instances of class Periodic"
        if hasattr(action, '__self__') and \
           isinstance(action.__self__, Task) and \
           action.__name__ == "start":
            kwargs.update({'join': True})
        else:
            kwargs.update({'join': False})
        super().__init__(action, **kwargs)
        assert isinstance(self._intervall, numbers.Number), 'intervall must be a number' + intervall
        assert self._intervall >= 0, 'intervall must be positive'
        assert isinstance(self._netto_time, bool), 'netto_time must be a bool value'
      

Class Sleep:


class Sleep(Task):
    def __init__(self, seconds: float, exc: ExceptionHandler=None):
        if not exc:
            exc = self._exc_default
        super().__init__(self._do_nothing, duration=seconds, exc=exc)
      

Stopping Tasks

We code more logic than needed for stopping. We also prepare continuation because we don't like to go twice trough the code.

States

We already know some states from the last lesson:


STATE_INIT = 'INIT'
STATE_STARTED = 'STARTED'
STATE_FINISHED = 'FINISHED'
      
Let's talk about their meaning:
  • INIT: The initial state of a task. When a task is constructed, but never started, its state is INIT. After a tasks start, it will never return to state INIT.
  • STARTED: The call of method start changes the tasks state from INIT or FINISHED to state STARTED. While the whole regular execution of the task, it stays in state STARTED. When the tasks last action is finished, the state changes from STARTED to FINISHED.
  • FINISHED: The final state of a task. When a task finished regularly and was not stopped, its state is FINISHED. A task in state FINISHED can be started again.
We extend these states:

STATE_INIT = 'INIT'
STATE_TO_START = 'TO_START'
STATE_STARTED = 'STARTED'
STATE_TO_STOP = 'TO_STOP'
STATE_STOPPED = 'STOPPED'
STATE_TO_CONTINUE = 'TO_CONTINUE'
STATE_FINISHED = 'FINISHED'
      
The meaning of the additional states:
  • TO_START: We set it if method start was called with an argument gap, which schedules the starting for the future. In the meantime the state is TO_START. State TO_START signals, there is no execution or sleeping in progress.
  • TO_STOP: Method stop already was called (this changed the state STARTEDTO_STOP) and the task or its contained tasks have not yet ended their last action. If a call of start follows, while the old execution still is in progress, the series of states is: STARTEDTO_STOPTO_STARTSTARTED.
  • STOPPED: The task ended an action and red the state TO_STOP. This prevents the execution of the next action or sleeping and the state changes TO_STOPSTOPPED.
  • TO_CONTINUE: Method cont was called, and it waits to execute the next action. State TO_CONTINUE signals (like INIT, STOPPED, FINISHED or TO_START), there is no execution or sleeping in progress.
While an action is executed, there can be a series of method-calls, f.i. stop, start, stop. Only the first of them results in a change of the state: STARTEDTO_STOP. We need another criterion to identify the situations after the second or third call of a method. This is the existence of the new thread (_thread_start or _thread_cont). The second call of start) creates the new thread _thread_start, the next call of stop prevents the new thread from executing an action.

The combination of the state and the existence of new threads makes the full understanding of the situation:

  • TO_STOP and _thread_start != None: methods stop and start were called while the last action was executed.
  • TO_STOP and _thread_cont != None: methods stop and cont were called while the last action was executed.
  • TO_STOP and _thread_start == None and _thread_cont == None: the last call was method stop and the last action still executes.

Responsibilities for state transitions

We look, which parts of our tasks are responsible for changes of the state (without method cont).

  • Method start and its followers _start2 and _start3:
    • From [INIT, STOPPED, FINISHED] to STARTED: if called with gap == 0.
    • From [INIT, STOPPED, FINISHED] to TO_START: if called with gap > 0.
    • Unchanged state TO_STOP: the old thread still is executing. A new thread is created and started, but the state remains TO_STOP.
    • From TO_START to STARTED: when the old thread ended and gap is over.
  • Method stop:
    • From STARTED to TO_STOP: never changes directly from STARTED to STOPPED.
    • From TO_START to STOPPED: TO_START signals, that _execute was not yet called.
    • From TO_CONTINUE to STOPPED: the old thread came to its end, the new thread did not yet call _execute.
  • Method _execute:
    • From STARTED to FINISHED: This is the regular case.
    • From TO_STOP to FINISHED: The actual action ended and there was no next one and no final sleeping.
    • From TO_STOP to TO_START: The actual action ended and a new thread _thread_start already was started from method start.
    • From TO_STOP to TO_CONTINUE: The actual action ended and a new thread _thread_cont already was started from method cont.
    • From TO_STOP to STOPPED: The actual action ended and there were no new threads.

Additional Attributes

We add attributes to class Task:


class Task:
    _exc_default = ExceptionHandler()
    _contained_register = {}
    
    def __init__(self, action: typing.Callable, **kwargs):
        ...
        # the following are root only attributes
        ...
        self._action_stop = kwargs.pop('action_stop', None)
        self._args_stop = kwargs.pop('args_stop', ())
        self._kwargs_stop = kwargs.pop('kwargs_stop', {})
        self._action_cont = kwargs.pop('action_cont', None)
        self._args_cont = kwargs.pop('args_cont', ())
        self._kwargs_cont = kwargs.pop('kwargs_cont', {})
        self._thread_start = None
        self._thread_cont = None
        self._actual = None
        self._cont_join = None
        self._time_called_stop = None
        self._restart = False
      
Their meaning:
  • _action_stop: The default stopping only prevents the next action from execution. Often it additionally needs a stopping action, f.i. stopping the sound or a movement.
  • _args_stop: The stopping action may have positional arguments.
  • _kwargs_stop: The stopping action may have keyword arguments.
  • _action_cont: like _action_stop, f.i. when _action_stop ends a movement, _action_cont restarts it.
  • _args_cont: positional arguments of _action_cont.
  • _kwargs_cont: keyword arguments of _action_cont.
  • _thread_start: We already discussed it. The old thread _thread may be in state TO_STOP when method start is called. Then attribute _thread_start holds the new thread as long as the old one also is needed.
  • _thread_cont: like thread_start, but for continuation.
  • _actual: holds the actual link in the chain. Continuation needs to know it.
  • _cont_join: If the task was stopped while joining a contained task, this attribute holds the task to join.
  • _time_called_stop: Continuation has to continue at the correct time. It takes value _time_action and adds the time distance between the call of the method stop and the time when it could start executing the next action.
  • _restart: If there was a sequence of method calls, while the state was TO_STOP, f.i. start, stop, cont, the last continuation needs to know, that is has to restart instead of continue.

Method stop

We add method stop:


    def stop(self) -> None:
        self._root._exc.fire()
        self._root._lock.acquire()
        try:
            assert self is self._root, 'only root tasks can be stopped'
            assert self._state in [
                STATE_TO_START,
                STATE_STARTED,
                STATE_TO_STOP,
                STATE_TO_CONTINUE,
                STATE_FINISHED
            ], "can't stop from state: " + self._state
            assert self._state != STATE_TO_STOP or self._thread_start or self._thread_cont, \
                "stopping is already in progress"
        except Exception as exc:
            self._root._exc.put(exc)
            self._root._lock.release()
            raise
        if self._state == STATE_FINISHED:
            self._lock.release()
            return
        if self._time_called_stop is None:
            self._time_called_stop = time.time()
        if self._activity is ACTIVITY_SLEEP:
            self._cond.notify()
        not_stopped = []
        for task in self._contained:
            if not task in self._contained_register or \
               not self._contained_register[task] is self:
                continue
            task.lock.acquire()
            if task._state in [STATE_STARTED, STATE_TO_START, STATE_TO_CONTINUE]:
                not_stopped.append(task)
            elif task._state == STATE_TO_STOP and (task._thread_start or task._thread_cont):
                not_stopped.append(task)
            task.lock.release()
        for task in not_stopped:
            task.stop()
        if self._state == STATE_STARTED:
            self._state = STATE_TO_STOP
        elif self._thread_start:
            self._thread_start = None
            if self._state == STATE_TO_START:
                self._state = STATE_STOPPED
        else:
            self._thread_cont = None
            if self._state == STATE_TO_CONTINUE:
                self._state = STATE_STOPPED
        self._lock.release()
      
Step by step we go through this code:
  • If the task already is finished, it silently does nothing:
    
            if self._state == STATE_FINISHED:
                self._lock.release()
                return
       
  • The time of the first call of stop is needed for the correct timing in method cont:
    
            if self._time_called_stop is None:
                self._time_called_stop = time.time()
       
  • If the task is sleeping, we interrupt the sleeping:
    
            if self._activity is ACTIVITY_SLEEP:
                self._cond.notify()
       
  • Stopping all contained tasks:
    
            not_stopped = []
            for task in self._contained:
                if not task in self._contained_register or \
                   not self._contained_register[task] is self:
                    continue
                task.lock.acquire()
                if task._state in [STATE_STARTED, STATE_TO_START, STATE_TO_CONTINUE]:
                    not_stopped.append(task)
                elif task._state == STATE_TO_STOP and (task._thread_start or task._thread_cont):
                    not_stopped.append(task)
                task.lock.release()
            for task in not_stopped:
                task.stop()
       
    This works recursive and stops all direct or indirect children tasks.
  • Changing state STARTEDTO_STOP:
    
            if self._state == STATE_STARTED:
                self._state = STATE_TO_STOP
       
  • If there was a call of method start, its new thread looses its reference:
    
            elif self._thread_start:
                self._thread_start = None
       
    This signals method start that the thread has to end before it reaches state STARTED
  • If there was a call of method cont, its new thread looses its reference:
    
            else:
                self._thread_cont = None
                if self._state == STATE_TO_CONTINUE:
                    self._state = STATE_STOPPED
       
    This signals method cont that the thread has to end before it reaches state STARTED

Method start

Method start has to handle the situation, when it finds a task in state TO_STOP and it got a keyword argument gap:


    def start(self, gap: float=0) -> 'Task':
        self._root._exc.fire()
        self._root._lock.acquire()
        try:
            assert isinstance(gap, numbers.Number), 'gap needs to be a number'
            assert gap >= 0, 'gap needs to be positive'
            assert self._root is self, 'only root tasks can be started'
            assert self._state in [
                STATE_INIT,
                STATE_TO_STOP,
                STATE_STOPPED,
                STATE_FINISHED
            ], "can't start from state " + self._state
            assert self._thread_start is None, "starting is already in progress"
            assert self._thread_cont is None, "continuation is already in progress"
        except Exception as exc:
            self._root._exc.put(exc)
            self._root._lock.release()
            raise
        if self._state == STATE_TO_STOP or gap > 0:
            if self._state == STATE_TO_STOP:
                self._restart = True
            else:
                self._state = STATE_TO_START
            if gap:
                self._thread_start = threading.Thread(
                    target=self._start2,
                    args=(time.time() + gap,)
                )
            else:
                self._thread_start = threading.Thread(target=self._start2)
            self._thread_start.start()
        else:
            self._start3()
            self._thread = threading.Thread(target=self._execute)
            self._thread.start()
        return self
      

Method _start2


    def _start2(self, time_action: float=None) -> None:
        if self._state == STATE_TO_STOP:
            self._lock.release()
            self._thread.join()
            self._exc.fire()
            self._lock.acquire()
        if not threading.current_thread() is self._thread_start:
            self._lock.release()
            return
        if time_action:
            gap = time_action - time.time()
            if gap > 0:
                self._activity = ACTIVITY_SLEEP
                self._cond.wait(gap)
                self._activity = ACTIVITY_NONE
                if not threading.current_thread() is self._thread_start:
                    self._lock.release()
                    return
        self._thread = self._thread_start
        self._thread_start = None
        self._start3()
        self._execute()
      
Annotations:
  • First it joins the old thread.
  • Then it tests if there was a call of method stop. If so, it returns without changing the state or executing something.
  • If method start was called with keyword argument gap, it waits until its time has come. Then it tests if meanwhile there was a call of stop.
  • Its thread becomes the thread of the task and it executes its actions.

Method _start3

These are a few lines of code, we need twice (in methods start and _start2):


    def _start3(self) -> None:
        self._state = STATE_STARTED
        self._restart = False
        self._time_called_stop = None
        self._actual = self
        self._cnt = 0
        self._time_action = time.time()
        if self._duration != None:
            self._time_end = self._time_action + self._duration
      

Method join

Joining needs to join all threads, the old and the new ones:


    def join(self) -> None:
        try:
            assert self._root is self, "only root tasks can be joined"
            assert self._state != STATE_INIT, "can't join tasks in state " + str(self._state)
        except Exception as exc:
            self._root._exc.put(exc)
            raise
        self._exc.fire()
        try: self._thread_start.join()
        except Exception: pass
        try: self._thread_cont.join()
        except Exception: pass
        try: self._thread.join()
        except Exception: pass
      

Method _execute

While a tasks action is executed or while it's sleeping, it releases its lock. This allows to execute method stop, which changes the state STARTEDTO_STOP. Method _execute needs to handle state TO_STOP and it needs to react as fast as possible:


    def _execute(self) -> None:
        while True:
            if self._root._state != STATE_STARTED:
                self._final(outstand=True)
                return
            try:
                gap = self._wrapper()
            except Exception as exc:
                self._exc.put(exc)
                raise
            self._cnt += 1
            if gap == -1 or self._num > 0 and self._cnt >= self._num:
                self._root._time_action = time.time()
                break
            if gap == 0:
                self._root._time_action = time.time()
                continue
            if self._netto_time:
                self._root._time_action = time.time() + gap
                real_gap = gap
            else:
                self._root._time_action += gap
                real_gap = self._root._time_action - time.time()
            if real_gap > 0:
                if self._root._state != STATE_STARTED:
                    self._final(outstand=True)
                    return
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(real_gap)
                self._root._activity = ACTIVITY_NONE
        if self._time_end:
            self._root._time_action = self._time_end
            gap = self._root._time_action - time.time()
            if self._root._state == STATE_STARTED and gap > 0:
                self._root._activity = ACTIVITY_SLEEP
                self._root._cond.wait(gap)
                self._root._activity = ACTIVITY_NONE
            if self._root._state == STATE_STARTED:
                self._time_end = None
            elif not self is self._root:
                self._root._time_end = self._time_end
                self._time_end = None
        else:
            self._root._time_action = time.time()
        if self._next:
            self._root._actual = self._next
            self._next._cnt = 0
            self._root._time_end = None
            if self._next._duration != None:
                self._next._time_end = self._root._time_action + self._next._duration
            self._next._execute()
        else:
            self._final()
      
The last task may have an argument duration. If we stop it while its last sleeping, the root task will end with attribute _time_end but not _actual. This signals: the last sleeping was not finished.

Method _wrapper1


    def _wrapper1(self) -> None:
        if hasattr(self._action, '__self__') and \
           isinstance(self._action.__self__, Task) and \
           self._action.__name__ in ["start", "cont", "join"]:
            task = self._action.__self__
            name = self._action.__name__
            if (self._join or name is "join"):
                self._root._cont_join = task
            if name in ["start", "cont"]:
                if not task in self._root._contained:
                    self._root._contained.append(task)
                self._contained_register.update({task: self._root})
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ in ["start", "cont"] or \
           self._action.__name__ == "start" and self._join:
            self._root._activity = ACTIVITY_BUSY
            self._root._lock.release()
            self._root._exc.fire()
      
Remarks:
  • Method cont is not time consuming, there is no need for releasing and aquiring the lock.
  • Attribute _cont_join needs to be set if we join a task.
  • If the action is the continuation of a task, we have to actualize the roots attribute _contained and the class attribute _contained_register.

Method wrapper2

The corresponding logic after the execution of an action:


    def _wrapper2(self) -> None:
        if self._join:
            self._action.__self__._thread.join()
        if not hasattr(self._action, '__self__') or \
           not isinstance(self._action.__self__, Task) or \
           not self._action.__name__ in ["start", "cont"] or \
           self._action.__name__ == "start" and self._join:
            self._root._exc.fire()
            self._root._lock.acquire()
            self._root._activity = ACTIVITY_NONE
        if hasattr(self._action, '__self__') and \
           isinstance(self._action.__self__, Task) and \
           self._action.__name__ in ["start", "stop", "cont", "join"]:
            task = self._action.__self__
            name = self._action.__name__
            state = task.state
            if self._root._cont_join and \
               (self._root._state == STATE_STARTED or \
                state == STATE_FINISHED):
                self._root._cont_join = None
            if (state == STATE_FINISHED or name == "stop") and \
               task in self._root._contained:
                self._root._contained.remove(task)
            if name == "stop" and \
               task in self._contained_register:
                self._contained_register.pop(task)
      

Method _final


    def _final(self, outstand=False) -> None:
        self._root._contained = self._join_contained()
        if self._root._state == STATE_STARTED:
            self._root._state = STATE_FINISHED
        elif self._root._state == STATE_TO_STOP:
            if not self._next and \
               not self._root._contained and \
               not self._root._time_end and \
               not outstand:
                self._root._state = STATE_FINISHED
            elif self._root._action_stop:
                self._root._action_stop(
                    *self._root._args_stop,
                    **self._root._kwargs_stop
                )
        if self._root._state == STATE_FINISHED:
            if self._root in self._contained_register:
                self._contained_register.pop(self._root)
            self._root._thread_cont = None
            self._root._actual = None
            self._root._time_action = None
        else:
            if not self._next and not outstand:
                self._root._actual = None
                self._root._time_action = None
            if self._root._thread_start:
                self._root._actual = None
                self._root._time_action = None
                self._root._state = STATE_TO_START
            elif self._root._thread_cont:
                self._root._state = STATE_TO_CONTINUE
            else:
                self._root._state = STATE_STOPPED
        if self._root._time_action and self._root._time_action < time.time():
            self._root._time_action = None
        self._root._lock.release()
      
Remarks:
  • There is a lot of logic for changing the state.
  • The stopping action may be called.
  • The new attributes need to be set.
These were all modifications of the code.

Class Jukebox

In the lessons 7 and 8 we used class Jukebox to demonstrate multithreading and the chances of the task concept. Now we modify method sound and prepare it to stop appropriate then we do the same with method song.

Modifying method sound

We change method sound:


    def sound(self, path: str, duration: float=None, repeat: bool=False) -> task.Task:
        if repeat:
            ops = b''.join([
                ev3.opSound,
                ev3.REPEAT,
                ev3.LCX(self._volume), # VOLUME
                ev3.LCS(path)          # NAME
            ])
        else:
            ops = b''.join([
                ev3.opSound,
                ev3.PLAY,
                ev3.LCX(self._volume), # VOLUME
                ev3.LCS(path)          # NAME
            ])
        if not repeat and not duration:
            return task.Task(
                self.send_direct_cmd,
                args=(ops,)
            )
        elif not repeat and duration:
            t_inner = task.Task(
                self.send_direct_cmd,
                args=(ops,),
                duration=duration,
                action_stop=self.stop
            )
            return task.Task(t_inner.start, join=True)
        elif repeat and not duration:
            t_inner = task.Task(
                self.send_direct_cmd,
                args=(ops,),
                duration=9999999,
                action_stop=self.stop,
                action_cont=self.send_direct_cmd,
                args_cont=(ops,)
            )
            return task.Task(t_inner.start, join=True)
        elif repeat and duration:
            t_inner = task.Task(
                self.send_direct_cmd,
                args=(ops,),
                duration=duration,
                action_stop=self.stop,
                action_cont=self.send_direct_cmd,
                args_cont=(ops,)
            )
            return task.Task(t_inner.start, join=True)
      
There is no direct command to continue a sound file. If you compare with our first version from lesson 8:
  • not repeat and not duration: this is unchanged, we implement no stopping because the task is not time consuming. This type of calling method sound should be used for short sound signals.
  • not repeat and duration: it stops the sound, but in case of continuation it will wait silently. Restarting does not fit the intended timing.
  • repeat and not duration: a task with endless duration has actions for stopping and continuation. It needs a final stop to end the playing of the sound file.
  • repeat and duration: this is straight foreward but later we will see, that it is not perfect.

Tests of method sound

not repeat and not duration

We stop the task directly after its start, this will change the state STARTEDTO_STOP. The task will wait until the action is finished, then change the state TO_STOPFINISHED. We run the following program:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces')
print("state:", t.state)
t.start()
print("state:", t.state)
t.stop()
print("state:", t.state)
time_stop = time.time()
t.join()
time_join = time.time()
print("state: {}, duration of stopping: {}, time_action: {}".format(
    t.state,
    time_join - time_stop,
    t.time_action
))
      
and get this output:

state: INIT
07:32:35.683207 Sent 0x|1D:00|2A:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
state: STARTED
state: TO_STOP
state: FINISHED, duration of stopping: 0.0002593994140625, time_action: None
      
Indeed, the call of method stop returned immediately and set the state to TO_STOP. A very short time of less than 1 thousandth sec. later, the task was finished. The sequence of states was INITSTARTEDTO_STOPFINISHED.

not repeat and duration

This uses a contained task. We print data from both tasks. Of special interest is attribute _time_end of the inner task. It holds the rest of the original duration.


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', duration=1)
print("state:", t.state)
t.start()
print("state: {}, state of contained task: {}".format(t.state, t._cont_join.state))
t.stop()
print("state: {}, state of contained task: {}".format(t.state, t._cont_join.state))
time_stop = time.time()
t.join()
time_join = time.time()
print("duration of stopping: {}, state: {}, time_action: {}".format(
    time_join - time_stop,
    t.state,
    t.time_action
))
print("Contained task:")
print("state: {}, actual: {}, time_end: {}".format(
    t._cont_join.state,
    t._cont_join._actual,
    t._cont_join._time_end - time.time()
))
      
The output:

state: INIT
07:55:56.404427 Sent 0x|1D:00|2A:00|80|00:00|94:02:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
state: STARTED, state of contained task: STARTED
state: TO_STOP, state of contained task: TO_STOP
07:55:56.405752 Sent 0x|07:00|2B:00|80|00:00|94:00|
duration of stopping: 0.0015096664428710938, state: STOPPED, time_action: None
Contained task:
state: STOPPED, actual: None, time_end: 0.9963183403015137
      
The program sent two direct commands. The first started the playing, the second stopped it. The stopping needed a bit more time, it had to stop the contained task too. Both tasks ended in state STOPPED and _time_end holds the rest of the original duration. When calling method cont, the task will silently wait because there is no actual action (attribute actual is None) and no action_cont.

repeat and not duration

We start an unlimited repeated playing of a sound file and stop it after three seconds:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', repeat=True)
t.start()
time.sleep(3)
t.stop()
t.join()
print("Contained task:")
print("state: {}, actual: {}, time_end: {}".format(
    t._cont_join.state,
    t._cont_join._actual,
    t._cont_join._time_end - time.time()
))
print(t._cont_join._action_cont)
      
The output:

08:31:22.900092 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
08:31:25.903642 Sent 0x|07:00|2B:00|80|00:00|94:00|
Contained task:
state: STOPPED, actual: None, time_end: 999999995.9944854
<bound method EV3.send_direct_cmd of <ev3_sound.Jukebox object at 0x7f61d5be19b0>>
      
The rest of the duration is very long now and the inner task it will not silently continue because attribute _action_cont is set.

repeat and duration

We don't test the stopping but we come back to it, when we test the continuation.


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', repeat=True, duration=3)
t.start().join()
print("state:", t.state)
      
The output:

08:41:59.082772 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
08:42:02.083546 Sent 0x|07:00|2B:00|80|00:00|94:00|
state: FINISHED
      
It seems to work.

Modifying method song

Method song returns a Task object with two contained tasks, one for colors, one for tones. We managed to stop the colors, when the sequence of tones ends, but we didn't like our solution. The new method stop allows to make colors and tones independent and group the colors around the tones:


task.concat(
    task.Task(colors.start),
    task.Task(tones.start, join=True),
    task.Task(colors.stop)
)
      

Our modifications:

  • We remove method play_song. Calling jukebox.song(ev3_sound.HAPPY_BIRTHDAY).start() does the job as well as jukebox.play_song(ev3_sound.HAPPY_BIRTHDAY).
  • We remove attribute _plays.
  • We simplify method stop:
    
        def stop(self) -> None:
            self.send_direct_cmd(ev3.opSound + ev3.BREAK)
       
  • We modify the task factory song:
    
        def song(self, song: dict) -> task.Task:
            tones = task.concat(
                task.Task(
                    self._init_tone,
                    action_stop=self.stop
                ),
                task.Repeated(
                    self._next_tone,
                    args=(song,)
                ),
                task.Task(self.stop)
            )
            colors = task.Periodic(
                60 * song["beats_per_bar"] / song["tempo"],
                self._next_color,
                args=(song,)
            )
            if "upbeat" in song:
                colors = task.concat(
                    task.Sleep(60 * song["upbeat"] / song["tempo"]),
                    colors
                )
            colors = task.concat(
                task.Task(
                    self._init_color,
                    action_stop=self.change_color,
                    args_stop=(ev3.LED_GREEN,)
                ),
                colors
            )
            return task.concat(
                task.Task(colors.start),
                task.Task(tones.start, join=True),
                task.Task(colors.stop)
            )
       
All four members of the task family, Task, Repeated, Periodic and Sleep are in use. Method song returns a Task object which can be combined with other task objects.

Testing method song

We test it with this program:


#!/usr/bin/env python3

import ev3, ev3_sound, task, time

jukebox = ev3_sound.Jukebox(
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
t = task.concat(
    jukebox.song(ev3_sound.HAPPY_BIRTHDAY),
    task.Sleep(1),
    jukebox.song(ev3_sound.TRIAS)
)
t.start()
time.sleep(9)
t.stop()
t.start(2)
      
It plays Happy Birthday for 9 sec., then it stops for 2 sec. (the color changes to green), then ist starts again and plays Happy Birthday, which is followed by the trias. The handling of contained task seems to be correct!

Tasks with long-lasting actions

Stopping

We code an action that lasts two sec. The stopping is called, when the action was executed since one sec. and needs another sec. to finish:


#!/usr/bin/env python3

import task, time

def first_action():
    print("first action begin")
    time.sleep(2)
    print("first action end")

t = task.concat(
    task.Task(first_action),
    task.Task(print, args=("second action",))
)
t.start()
print("task started, state:", t.state)
time.sleep(1)
t.stop()
time_stop = time.time()
print("task stopped, state:", t.state)
t.join()
time_join = time.time()
print("duration of joining:", time_join - time_stop)
print("task joined, state:", t.state)
print("time_action:", t.time_action)
      
We expect that the state TO_STOP lasts one sec. because the stopping algorithm has to wait until the actual action is finished. Then the state will change TO_STOPSTOPPED. The output:

first action begin
task started, state: STARTED
task stopped, state: TO_STOP
first action end
duration of joining: 1.0017707347869873
task joined, state: STOPPED
time_action: None
      
Indeed, the stopping needed about one sec. The value None of property time_action says: no limitation, the next action can be started as fast as possible.

Restarting

We call a series of methods start and stop, while the stopping is in execution:


#!/usr/bin/env python3

import task, time

def first_action():
    print("first action begin")
    time.sleep(2)
    print("first action end")

t = task.concat(
    task.Task(first_action),
    task.Task(print, args=("second action",))
)
t.start()
time.sleep(1)
t.stop()
print("task stopped, state:", t.state)
t.start()
print("task started, state:", t.state, " thread_start:", t._thread_start)
t.stop()
print("task stopped, state:", t.state, " thread_start:", t._thread_start)
      
The output:

first action begin
task stopped, state: TO_STOP
task started, state: TO_STOP  thread_start: <Thread(Thread-2, started 139977212032768)>
task stopped, state: TO_STOP  thread_start: None
first action end
      
The first call of method stop changed the state STARTEDTO_STOP. From then on it needs one sec. until the action ends. In this time the program calls another start and another stop. Method start creates and starts thread _thread_start, but it does not execute anything because its reference disappeares while it is waiting for the end of the old thread.

Continue a stopped task

We prepared continuation when we realized method stop. Now we add method cont.

Method cont

Like method start, cont starts a thread and returns immediately:


    def cont(self, gap: float=None) -> 'Task':
        self._exc.fire()
        self._lock.acquire()
        try:
            assert self is self._root, 'only root tasks can be continued'
            assert gap is None or isinstance(gap, numbers.Number), 'gap needs to be a number'
            assert gap is None or gap >= 0, 'gap needs to be positive'
            assert self._state in [
                STATE_STOPPED,
                STATE_TO_STOP,
                STATE_FINISHED
            ], "can't continue from state: {} (task: {})".format(
                self._state,
                self
            )
            assert self._thread_start is None, "starting is already in progress"
            assert self._thread_cont is None, "continuation is already in progress"
        except Exception as exc:
            self._root._exc.put(exc)
            self._root._lock.release()
            raise
        if self._state == STATE_FINISHED:
            self._lock.release()
            return self
        if gap is None:
            self._thread_cont = threading.Thread(target=self._cont2)
        else:
            self._thread_cont = threading.Thread(
                target=self._cont2,
                kwargs={"time_cont": time.time() + gap}
            )
        self._thread_cont.start()
        return self
      
Annotations:
  • If the task already is finished, it does nothing and returns silently.
  • After starting the new thread, it returns without releasing the lock.
  • Unlike method start its thread never calls method _execute. Continuation has to handle contained tasks and only method _cont2 knows how to do that.

Method _cont2

Method _cont2 runs in the thread, that was started from method cont. The state of the task is either STOPPED or TO_STOP.


    def _cont2(self, time_cont: float=None, time_delta: float=None) -> None:
        if self._state == STATE_STOPPED:
            self._state = STATE_TO_CONTINUE
        elif self._state == STATE_TO_STOP:
            self._lock.release()
            self._thread.join()
            self._exc.fire()
            self._lock.acquire()
        if not threading.current_thread() is self._thread_cont:
            self._lock.release()
            return
        if time_cont:
            gap = time_cont - time.time()
            if gap > 0:
                self._activity = ACTIVITY_SLEEP
                self._cond.wait(gap)
                self._activity = ACTIVITY_NONE
                if not threading.current_thread() is self._thread_cont:
                    self._lock.release()
                    return
        if self._restart:
            self._restart = False
            self._actual = self
            self._contained = []
            self._time_action = time.time()
            if self._duration:
                self._time_end = self._time_action + self._duration
            else:
                self._time_end = None
        else:
            if self._action_cont:
                self._action_cont(*self._args_cont, **self._kwargs_cont)
            if not time_cont and not time_delta:
                time_delta = time.time() - self._time_called_stop
            elif not time_delta:
                next_time_action = self.time_action_no_lock
                if next_time_action:
                    time_delta = time.time() - next_time_action
                elif self._time_end:
                    time_delta = time.time() - self._time_called_stop
                else:
                    time_delta = -1
            if self._actual:
                if self._time_action:
                    self._time_action += time_delta
                if self._actual._time_end:
                    self._actual._time_end += time_delta
            elif self._time_end:
                self._time_end += time_delta
        self._state = STATE_STARTED
        self._time_called_stop = None
        self._thread = self._thread_cont
        self._thread_cont = None
        if self._contained:
            for task in self._contained:
                if task._state is STATE_FINISHED:
                    continue
                if not task in self._contained_register or \
                   self._contained_register[task] != self:
                    continue
                task._lock.acquire()
                task._thread_cont = threading.Thread(
                    target=task._cont2,
                    kwargs={'time_cont': time_cont, 'time_delta': time_delta}
                )
                task._thread_cont.start()
            if self._cont_join:
                self._activity = ACTIVITY_JOIN
                self._lock.release()
                self._cont_join.join()
                self._exc.fire()
                self._lock.acquire()
                self._activity = ACTIVITY_NONE
                if self._state != STATE_STARTED:
                    self._final()
                    return
        if self._actual:
            if self._time_action:
                gap = self._time_action - time.time()
                if gap > 0:
                    self._activity = ACTIVITY_SLEEP
                    self._cond.wait(gap)
                    self._activity = ACTIVITY_NONE
                    if self._state != STATE_STARTED:
                        self._final()
                        return
            self._actual._execute()
        else:
            if self._time_end:
                gap = self._time_end  - time.time()
                if gap > 0:
                    self._activity = ACTIVITY_SLEEP
                    self._cond.wait(gap)
                    self._activity = ACTIVITY_NONE
                    if self._state != STATE_STARTED:
                        self._final()
                        return
            self._time_end = None
            self._final()
      
Step by step we go through this code:
  • If the tasks old thread still is executing an action, we wait until it ends.
    
            if self._state == STATE_STOPPED:
                self._state = STATE_TO_CONTINUE
            elif self._state == STATE_TO_STOP:
                self._lock.release()
                self._thread.join()
                self._exc.fire()
                self._lock.acquire()
       
    Joining is time consuming, meanwhile the lock is released.
  • While joining, there could be a call of method stop. If so, it sets _thread_cont = None and another call of cont would creat a new _thread_cont. We control both:
    
            if not threading.current_thread() is self._thread_cont:
                self._lock.release()
                return
       
  • If the method was called with argument gap. We wait this time:
    
            if time_cont:
                gap = time_cont - time.time()
                if gap > 0:
                    self._activity = ACTIVITY_SLEEP
                    self._cond.wait(gap)
                    self._activity = ACTIVITY_NONE
                    if not threading.current_thread() is self._thread_cont:
                        self._lock.release()
                        return
       
  • If there was a call of method start between the first stop and the actual cont, we have to continue with the first link in the chain (the root task) and ignore all contained tasks.
    
            if self._restart:
                self._restart = False
                self._actual = self
                self._contained = []
                self._time_action = time.time()
                if self._duration:
                    self._time_end = self._time_action + self._duration
                else:
                    self._time_end = None
       
  • When there is a special action for continuation, it is called:
    
            else:
                if self._action_cont:
                    self._action_cont(*self._args_cont, **self._kwargs_cont)
       
  • We have to correct all timing by a time shift. This may be the time between the calls of stop and cont or argument gap made it.
    
                if not time_cont and not time_delta:
                    time_delta = time.time() - self._time_called_stop
                elif not time_delta:
                    next_time_action = self.time_action_no_lock
                    if next_time_action:
                        time_delta = time.time() - next_time_action
                    elif self._time_end:
                        time_delta = time.time() - self._time_called_stop
                    else:
                        time_delta = -1
       
    If the task is a contained task, time_delta came in as an argument. Value -1 says, it's not needed, but set. This prevents the contained tasks from repeating the calculation.
  • The correction of the timing:
    
                if self._actual:
                    if self._time_action:
                        self._time_action += time_delta
                    if self._actual._time_end:
                        self._actual._time_end += time_delta
                elif self._time_end:
                    self._time_end += time_delta
       
    Recursion will do the same shift in all contained tasks so that the original synchronisation still will be conserved.
  • The state changes to STARTED and the continuation thread becomes the thread of the task:
    
            self._state = STATE_STARTED
            self._time_called_stop = None
            self._thread = self._thread_cont
            self._thread_cont = None
       
  • All contained tasks have to continue. Their synchronization must be reconstructed. This is done by calling their method _cont2 with keyword argument time_delta:
    
            if self._contained:
                for task in self._contained:
                    if task._state is STATE_FINISHED:
                        continue
                    if not task in self._contained_register or \
                       self._contained_register[task] != self:
                        continue
                    task._lock.acquire()
                    task._thread_cont = threading.Thread(
                        target=task._cont2,
                        kwargs={'time_cont': time_cont, 'time_delta': time_delta}
                    )
                    task._thread_cont.start()
       
  • The task may have been stopped, while joining a contained task. If so, the joining must occur before the tasks next action is executed.
    
                if self._cont_join:
                    self._activity = ACTIVITY_JOIN
                    self._lock.release()
                    self._cont_join.join()
                    self._exc.fire()
                    self._lock.acquire()
                    self._activity = ACTIVITY_NONE
                    if self._state != STATE_STARTED:
                        self._final()
                        return
       
  • The execution of the next action may need another sleeping. If there is no more action, the task has to join its contained tasks. This is done by method _final.
    
            if self._actual:
                if self._time_action:
                    gap = self._time_action - time.time()
                    if gap > 0:
                        self._activity = ACTIVITY_SLEEP
                        self._cond.wait(gap)
                        self._activity = ACTIVITY_NONE
                        if self._state != STATE_STARTED:
                            self._final()
                            return
                self._actual._execute()
            else:
                if self._time_end:
                    gap = self._time_end  - time.time()
                    if gap > 0:
                        self._activity = ACTIVITY_SLEEP
                        self._cond.wait(gap)
                        self._activity = ACTIVITY_NONE
                        if self._state != STATE_STARTED:
                            self._final()
                            return
                self._time_end = None
                self._final()
       

Continuing tasks with long-lasting actions

We test the continuation with this program:


#!/usr/bin/env python3

import task, time, datetime

def action(txt):
    now = datetime.datetime.now().strftime('%H:%M:%S.%f')
    print(now, txt, "begin")
    time.sleep(2)
    now = datetime.datetime.now().strftime('%H:%M:%S.%f')
    print(now, txt, "end")

t = task.concat(
    task.Task(action, args=("first action",)),
    task.Task(action, args=("last action",))
)
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task created, state:", t.state)
t.start()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task started, state:", t.state)
time.sleep(1)
t.stop()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task stopped, state:", t.state)
t.cont(gap=2)
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task continued, state:", t.state)
time.sleep(1)
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "waited 1 sec., state:", t.state)
time.sleep(1)
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "waited 1 sec., state:", t.state)
t.join()
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now, "task joined, state:", t.state)
      
The output:

18:54:17.673259 task created, state: INIT
18:54:17.674380 first action begin
18:54:17.674556 task started, state: STARTED
18:54:18.675986 task stopped, state: TO_STOP
18:54:18.677085 task continued, state: TO_STOP
18:54:19.676891 first action end
18:54:19.678387 waited 1 sec., state: TO_CONTINUE
18:54:20.677659 last action begin
18:54:20.679900 waited 1 sec., state: STARTED
18:54:22.680142 last action end
18:54:22.680710 task joined, state: FINISHED
      
Looks good, both actions are executed once, the second action begins two sec. after the call of cont, the states are INITSTARTEDTO_STOPTO_CONTINUESTARTEDFINISHED.

Stopping and continuing a song

Again we use class Jukebox to test contained tasks:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(
    protocol=ev3.BLUETOOTH,
    host='00:16:53:42:2B:99'
)
jukebox.verbosity = 1
t_song = jukebox.song(ev3_sound.TRIAS)
t_song.start()
time.sleep(1)
t_song.stop()
t_song.cont(2)
      
The output:

20:30:17.552945 Sent 0x|08:00|2A:00|80|00:00|82:1B:03|
20:30:17.557831 Sent 0x|0C:00|2B:00|80|00:00|94:01:01:82:06:01:00|
20:30:18.309089 Sent 0x|0C:00|2C:00|80|00:00|94:01:01:82:4A:01:00|
20:30:18.558362 Sent 0x|08:00|2D:00|80|00:00|82:1B:01|
20:30:18.559895 Sent 0x|07:00|2E:00|80|00:00|94:00|
20:30:20.560389 Sent 0x|0C:00|2F:00|80|00:00|94:01:01:82:88:01:00|
20:30:21.305400 Sent 0x|08:00|30:00|80|00:00|82:1B:05|
20:30:21.310159 Sent 0x|0C:00|31:00|80|00:00|94:01:01:82:0B:02:00|
20:30:23.555294 Sent 0x|08:00|32:00|80|00:00|82:1B:03|
20:30:23.559971 Sent 0x|07:00|33:00|80|00:00|94:00|
20:30:23.561446 Sent 0x|08:00|34:00|80|00:00|82:1B:01|
      
Synchronisation of contained tasks works correct, tones and light are synchrone in the first and the second part of the song.

Stopping and continuing a repeated sound file

Situations, where the algorithm doesn't work as we intent, help to understand the mechanism. One learns more by errors than by anything else. What's our problem? There is no direct command to continue the playing of a sound file! We can stop and restart but not continue. To continue a repeated sound file with a fixed duration, we start it again but with a modified timing (shorter duration).

We test the stopping and continuation:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', duration=5, repeat=True)
t.start()
time.sleep(2)
t.stop()
time.sleep(2)
t.cont()
      
The output:

20:46:58.965333 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
20:47:00.967754 Sent 0x|07:00|2B:00|80|00:00|94:00|
20:47:02.970920 Sent 0x|1D:00|2C:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
20:47:05.968598 Sent 0x|07:00|2D:00|80|00:00|94:00|
      
Looks great!

The problem of the next action

We modify the program slightly and call method cont with argument gap:


#!/usr/bin/env python3

import ev3, ev3_sound, time

jukebox = ev3_sound.Jukebox(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99')
jukebox.verbosity = 1

t = jukebox.sound('./ui/DownloadSucces', duration=5, repeat=True)
t.start()
time.sleep(2)
t.stop()
t.cont(2)
 
The output:

20:51:15.311826 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
20:51:17.314737 Sent 0x|07:00|2B:00|80|00:00|94:00|
20:51:19.315760 Sent 0x|1D:00|2C:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
20:51:19.317129 Sent 0x|07:00|2D:00|80|00:00|94:00|
 
The stopping occurs immediately after the continuation, why? Calling method cont with argument gap says: wait, then continue with the next action. The next action is stop! The callers intention is, that the repeated playing is the action and she does not know, that the playing of sound files can't be continued. We tricksed when we restarted and named it continuation. This trick fails under some special conditions.

The solution: subclassing Task

The solution of the problem lies in subclassing. We modify the continuation and prevent the immediate execution of the next action. The new version of method sound:


    def sound(self, path: str, duration: float=None, repeat: bool=False) -> task.Task:
        ...
        elif repeat:
            class _Task(task.Task):
                def _final(self, **kwargs):
                    super()._final(**kwargs)
                    if self._root._time_action:
                        self._root._time_rest = self._root._time_action - time.time()
                        self._root._time_action -= self._root._time_rest
                def _cont2(self, **kwargs):
                    self._time_action += self._time_rest
                    super()._cont2(**kwargs)

            t_inner = task.concat(
                _Task(
                    self.send_direct_cmd,
                    args=(ops,),
                    duration=duration,
                    action_stop=self.stop,
                    action_cont=self.send_direct_cmd,
                    args_cont=(ops,)
                ),
                _Task(self.stop)
            )
            return task.Task(t_inner.start, join=True)
        ...
      
We shift _time_action backwards from the stopping to the sound continuation. This is what the outer world expects. When continuing, we shift in reverse direction and reconstruct the old situation. If in the meantime somebody asks for property time_action, he gets the shifted value. This will change the behaviour of method cont when argument gap is set. We again start the program. Its output:

21:01:58.678197 Sent 0x|1D:00|2A:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
21:02:00.680940 Sent 0x|07:00|2B:00|80|00:00|94:00|
21:02:02.681971 Sent 0x|1D:00|2C:00|80|00:00|94:03:01:84:2E:2F:75:69:2F:44:6F:77:6E:6C:6F:61:64:53:75:63:63:65:73:00|
21:02:05.682649 Sent 0x|07:00|2D:00|80|00:00|94:00|
      
Well done!

Conclusion

Stopping and continuing tasks is a complex mechanism. This lesson was hard stuff! We had to manage all the timing and synchronization, a lot of technical details. When we look from the outside on the tasks, we don't see all this details and realize the benefits:

  • It provides modularity. We can combine small tasks to medium tasks and medium tasks to complex tasks.
  • It helps to manage high complexity because the compexity of the outside API doesn't grow.
  • It helps for good design. Starting, stopping and continuation seems very natural.
  • It hides the locking and the multithreading mechanism and the synchronization of contained tasks.

We also see the drawbacks:

  • Stopping and continuing need a special design of the tasks. A simple wrapping of a callable into a task object often does not fit our needs. This complicates the coding of the robots actions.
  • Simple things become at least medium complex. Tasks are an abstraction layer, that doesn't keep simple things simple.

I would be glad to get your feedback! Our next lesson will be about class TwoWheelVehicle. We will train it to stop immediately and continue appropriate.