Introduction
This lesson is about the filesystem of the EV3
device, which is a Linux computer with a verly low security
level. We start with a warning: Direct commands (and system commands) have read- and
write-access to the whole filesystem. This says that they
can damage the heart of your EV3
device
and leave it in a state, where is does not boot nor execute
anything. If you have a EV3
device in this damaged
state, you need to boot from a SD-Card and then repair the
filesystem.
The filesystem allows to put resources to the EV3
device and then to reference them in direct commands. Resources
can be byte code, sound, graphic, icons or data. All of them
expand the EV3
's spectrum of actions. Sound files
allow to program speaking robots. Graphic files can be used to
give your robot a face and so on. You know from lesson 2 how to
play sound files, to display graphic files and to start
executable programs. This lesson tells how to copy them from
your local host to the EV3
device or vice versa. We
also learn to create and delete directories or remove files.
Operation opFile
Operation opFile
has 32 different CMDs but some
important are missed. We first look at some of the existing
ones, then we talk about the deficit.
GET_FOLDERS
This command reads the number of subdirectories of a given directory.
opFile = 0x|C0|
with CMDGET_FOLDERS = 0x|0D|
:
Arguments- (Data8) NAME: First character in folder name (Ex. “../prjs/”)
Returns- (Data8) NUMBER: No of sub folders
#!/usr/bin/env python3
import ev3, struct
my_ev3 = ev3.EV3(
protocol=ev3.USB,
host='00:16:53:42:2B:99'
)
my_ev3.verbosity = 1
directory = '/'
ops = b''.join([
ev3.opFile,
ev3.GET_FOLDERS,
ev3.LCS(directory),
ev3.GVX(0)
])
reply = my_ev3.send_direct_cmd(ops, global_mem=1)
num = struct.unpack('<B', reply[5:])[0]
print(
"Directory '{}' has {} subdirectories".format(
directory,
num
)
)
and got this output:
11:46:29.588485 Sent 0x|0B:00|2A:00|00|02:00|C0:0D:84:2F:00:60|
11:46:29.636312 Recv 0x|05:00|2A:00|02|10|
Directory '/' has 16 subdirectories
The root directory of my EV3
device has 16 subdirectories.
GET_SUBFOLDER_NAME
This command reads the name of a subdirectory. The subdirectory is qualified by a number.
opFile = 0x|C0|
with CMDGET_SUBFOLDER_NAME = 0x|0F|
:
Arguments- (Data8) NAME: First character in folder name (Ex. “../prjs/”)
- (Data8) ITEM: Sub folder index [1..ITEMS]
- (Data8) LENGTH: Maximal string length to read
Returns- (Data8) STRING: Fist character of folder name (Character string)
#!/usr/bin/env python3
import ev3, struct
my_ev3 = ev3.EV3(
protocol=ev3.USB,
host='00:16:53:42:2B:99'
)
directory = '/'
ops = b''.join([
ev3.opFile,
ev3.GET_FOLDERS,
ev3.LCS(directory),
ev3.GVX(0)
])
reply = my_ev3.send_direct_cmd(ops, global_mem=1)
num = struct.unpack('<B', reply[5:])[0]
print(
"Directory '{}' has {} subdirectories:".format(
directory,
num
)
)
for i in range(num):
ops = b''.join([
ev3.opFile,
ev3.GET_SUBFOLDER_NAME,
ev3.LCS(directory),
ev3.LCX(i + 1), # ITEM
ev3.LCX(64), # LENGTH
ev3.GVX(0) # NAME
])
reply = my_ev3.send_direct_cmd(ops, global_mem=64)
subdir = struct.unpack('64s', reply[5:])[0]
subdir = subdir.split(b'\x00')[0]
subdir = subdir.decode("utf8")
print(" {}".format(subdir))
and got this output:
Directory '/' has 16 subdirectories:
bin
boot
dev
etc
home
lib
linuxrc
media
mnt
opt
proc
sbin
sys
tmp
usr
var
No doubt, it's Linux! When I looked into directory /home/
I found
one subdirectory: root
. Only one user on the system. Everything,
my EV3
device does, is done with root permissions, even the
execution of the direct commands!
OPEN_READ, READ_BYTES and CLOSE
A file handle is a 8-bit reference, that allows access to a file.
Command OPEN_READ
creates and returns a file handle
and binds it to the file,
command CLOSE
ends the binding of a file handle
and therefore ends the access to the file.
With this knowledge, the plan to read a file is:
OPEN_READ
: Creates a file handle and returns the handle and the size of the file.- multiple
READ_BYTES
: Uses the file handle and reads partitions of data that fit into the global memory of a direct command. CLOSE
: closes the file.
This plan doesn't work! The file handle looses its
binding when the direct command ends. We can read small files,
when we combine an OPEN_READ
and a
READ_BYTES
into a single direct command, but there is no chance
to read large files. We
read the heading 1014 bytes of
file /bin/usb-devices
:
#!/usr/bin/env python3
import ev3, struct
my_ev3 = ev3.EV3(
protocol=ev3.USB,
host='00:16:53:42:2B:99'
)
file_name = "/bin/usb-devices"
ops = b''.join([
ev3.opFile,
ev3.OPEN_READ,
ev3.LCS(file_name), # NAME
ev3.LVX(0), # HANDLE
ev3.GVX(0), # SIZE
ev3.opFile,
ev3.READ_BYTES,
ev3.LVX(0), # HANDLE
ev3.LCX(1014), # BYTES
ev3.GVX(4), # DESTINATION
])
reply = my_ev3.send_direct_cmd(ops, local_mem=1, global_mem=1018)
(size, data) = struct.unpack('<I1014s', reply[5:])
data = data.decode("utf8")
print("File: {}, size: {} bytes".format(file_name, size))
print(data)
The output:
File: /bin/usb-devices, size: 4202 bytes
#!/bin/bash
# Copyright: 2009 Greg Kroah-Hartman <greg@kroah.com>
# 2009 Randy Dunlap <rdunlap@xenotime.net>
# 2009 Frans Pop <elendil@planet.nl>
#
# This software may be used and distributed according to the terms of
# the GNU General Public License (GPL), version 2, or at your option
# any later version.
print_string() {
file=$1
name=$2
if [ -f $file ]; then
echo "S: $name=`cat $file`"
fi
}
class_decode() {
local class=$1 # v4: in hex
case $class in
"00") echo ">ifc " ;;
"01") echo "audio" ;;
"02") echo "commc" ;;
"03") echo "HID " ;;
"05") echo "PID " ;;
"06") echo "still" ;;
"07") echo "print" ;;
"08") echo "stor." ;;
"09") echo "hub " ;;
"0a") echo "data " ;;
"0b") echo "scard" ;;
"0d") echo "c-sec" ;;
"0e") echo "video" ;;
"0f") echo "perhc" ;;
"dc") echo "diagd" ;;
"e0") echo "wlcon" ;;
"ef") echo "misc " ;;
"fe") echo "app. " ;;
"ff") echo "vend." ;;
"*") echo "unk. " ;;
esac
}
print_endpoint() {
local eppath=$1
addr=`cat $eppa
It's a shell-script.
OPEN_WRITE, OPEN_APPEND, WRITE_BYTES
Loosing the handles binding is no problem for writing.
We can start with an OPEN_WRITE
and follow with OPEN_APPEND
,
but there is another problem. Let's look at operation WRITE_BYTES
:
opFile = 0x|C0|
with CMDWRITE_BYTES = 0x|1D|
:
Arguments- (Data8) HANDLE: file handle
- (Data8) BYTES: Number of bytes to write
- (Data8) SOURCE: First byte in byte stream to write
SOURCE
is an address of the local or global memory.
This says we first have to write the data into the memory, then
we can write it into a file. There is no operation to
write a byte stream to the memory, we have to split
it in small units (f.i. operation opMove32_32
allows to write 4 bytes to the memory).
Deficits of direct commands
We found the following deficits:
- Reading large files is not possible because the direct command runs in its own environment and its file handles loose the reference when the execution of a direct command ends.
- Writing large files is not effective because we can't upload large partitions of data.
- There is no operation to read the files in a directory.
System Commands
There are 15 system commands
(decribed in the document
EV3 Communication Developer Kit)
that complete the direct commands.
F.i. the system commands BEGIN_UPLOAD
and
CONTINUE_UPLOAD
allow to read large files. They
correspond to the above decribed situation. The main difference
is the persistence of the file handle. BEGIN_UPLOAD
creates and returns a file handle, which keeps its binding
until CONTINUE_UPLOAD
comes to the files end.
There are some differences between direct commands and system commands:
- A system commands consists of one single command. You can't chain system commands.
- System commands don't use local or global memory, there is no header.
- The structure of system commands is:
- Length of the message (bytes 0, 1)
- Message counter (bytes 2, 3)
- Message type (byte 4): It may have the following two values:
SYSTEM_COMMAND_REPLY = 0x|01|
SYSTEM_COMMAND_NO_REPLY = 0x|81|
- System command with arguments (starting at byte 5): max. 1,019 bytes
- System commands don't use LCS, LCX, LVX, GVX. Their arguments are separated by position. Strings are zero terminated.
- The structure of the reply is:
- Length of the reply (bytes 0, 1)
- Message counter (bytes 2, 3)
- Reply type (byte 4):
SYSTEM_REPLY = 0x|03|
: the system command was successfully operatedSYSTEM_REPLY_ERROR = 0x|05|
: the system command ended with an error.
- Code of system command (byte 5)
- Reply status (byte 6): success, type of error, additional information
- Data (starting at byte 7): The return values of the system command (max. 1,017 bytes).
DELETE_FILE
is an alternative
to operation opFile
with CMD REMOVE
).
Module ev3
We add new methods and data to class EV3 so that it can send system commands to the EV3
device
and receive its replies:
class EV3:
...
def send_system_cmd(self, cmd: bytes, reply: bool=True) -> bytes:
if reply:
cmd_type = _SYSTEM_COMMAND_REPLY
else:
cmd_type = _SYSTEM_COMMAND_NO_REPLY
self._lock.acquire()
if self._msg_cnt < 65535:
self._msg_cnt += 1
else:
self._msg_cnt = 1
msg_cnt = self._msg_cnt
self._lock.release()
cmd = b''.join([
struct.pack('<hh', len(cmd) + 3, msg_cnt),
cmd_type,
cmd
])
if self._verbosity >= 1:
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now + \
' Sent 0x|' + \
':'.join('{:02X}'.format(byte) for byte in cmd[0:2]) + '|' + \
':'.join('{:02X}'.format(byte) for byte in cmd[2:4]) + '|' + \
':'.join('{:02X}'.format(byte) for byte in cmd[4:5]) + '|' + \
':'.join('{:02X}'.format(byte) for byte in cmd[5:]) + '|' \
)
# pylint: disable=no-member
if self._protocol in [BLUETOOTH, WIFI]:
self._socket.send(cmd)
elif self._protocol is USB:
self._device.write(_EP_OUT, cmd, 100)
else:
raise RuntimeError('No EV3 connected')
# pylint: enable=no-member
counter = cmd[2:4]
if not reply:
return counter
else:
reply = self._wait_for_system_reply(counter)
return reply
def _wait_for_system_reply(self, counter: bytes) -> bytes:
self._lock.acquire()
reply = self._get_foreign_reply(counter)
if reply:
self._lock.release()
if reply[4:5] != _SYSTEM_REPLY:
raise SysCmdError("error: {:02X}".format(reply[6]))
return reply
if self._protocol == BLUETOOTH:
time.sleep(0.1)
while True:
# pylint: disable=no-member
if self._protocol in [BLUETOOTH, WIFI]:
reply = self._socket.recv(1024)
else:
reply = bytes(self._device.read(_EP_IN, 1024, 0))
# pylint: enable=no-member
len_data = struct.unpack('<H', reply[:2])[0] + 2
reply_counter = reply[2:4]
if self._verbosity >= 1:
now = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(now + \
' Recv 0x|' + \
':'.join('{:02X}'.format(byte) for byte in reply[0:2]) + \
'|' + \
':'.join('{:02X}'.format(byte) for byte in reply[2:4]) + \
'|' + \
':'.join('{:02X}'.format(byte) for byte in reply[4:5]) + \
'|' + \
':'.join('{:02X}'.format(byte) for byte in reply[5:6]) + \
'|' + \
':'.join('{:02X}'.format(byte) for byte in reply[6:7]) + \
'|', end='')
if len_data > 7:
dat = ':'.join('{:02X}'.format(byte) for byte in reply[7:len_data])
print(dat + '|')
else:
print()
if counter != reply_counter:
self._put_foreign_reply(reply_counter, reply[:len_data])
else:
self._lock.release()
if reply[4:5] != _SYSTEM_REPLY:
raise SysCmdError("system command replied error: {:02X}".format(reply[6]))
return reply[:len_data]
...
_SYSTEM_COMMAND_REPLY = b'\x01'
_SYSTEM_COMMAND_NO_REPLY = b'\x81'
_SYSTEM_REPLY = b'\x03'
_SYSTEM_REPLY_ERROR = b'\x05'
_SYSTEM_REPLY_OK = b'\x00'
_SYSTEM_UNKNOWN_HANDLE = b'\x01'
_SYSTEM_HANDLE_NOT_READY = b'\x02'
_SYSTEM_CORRUPT_FILE = b'\x03'
_SYSTEM_NO_HANDLES_AVAILABLE = b'\x04'
_SYSTEM_NO_PERMISSION = b'\x05'
_SYSTEM_ILLEGAL_PATH = b'\x06'
_SYSTEM_FILE_EXITS = b'\x07'
_SYSTEM_END_OF_FILE = b'\x08'
_SYSTEM_SIZE_ERROR = b'\x09'
_SYSTEM_UNKNOWN_ERROR = b'\x0A'
_SYSTEM_ILLEGAL_FILENAME = b'\x0B'
_SYSTEM_ILLEGAL_CONNECTION= b'\x0C'
BEGIN_DOWNLOAD = b'\x92'
CONTINUE_DOWNLOAD = b'\x93'
BEGIN_UPLOAD = b'\x94'
CONTINUE_UPLOAD = b'\x95'
BEGIN_GETFILE = b'\x96'
CONTINUE_GETFILE = b'\x97'
CLOSE_FILEHANDLE = b'\x98'
LIST_FILES = b'\x99'
CONTINUE_LIST_FILES = b'\x9A'
CREATE_DIR = b'\x9B'
DELETE_FILE = b'\x9C'
LIST_OPEN_HANDLES = b'\x9D'
WRITEMAILBOX = b'\x9E'
BLUETOOTHPIN = b'\x9F'
ENTERFWUPDATE = b'\xA0'
...
Annotations:
- Method
send_system_cmd
with its internal method_wait_for_system_reply
are modifications of their counterparts for direct commands. - System commands and direct commands use the same message counter and and stack of foreign replies.
- There is no usage of
sync_mode
. Instead methodsend_system_cmd
has an argumentreply
. - Blutooth seems to have problems with fast asking for replies of system commands, therefore we wait.
Reading Files
System commands BEGIN_UPLOAD and CONTINUE_UPLOAD
Reading a large file from EV3
's filesystem needs these two system commands:
BEGIN_UPLOAD = 0x|94|
:
Arguments- (Data16) SIZE: length of the first partition (in bytes, max. 1,012)
- (DataX) NAME: zero-terminated path to the file (including the filename).
If the path doesn't start with a backslash (f.i.
./...
or../...
orui/...
), it's relative from/home/root/lms2012/sys/
.
Returns- (Data32) SIZE: size of the file (in bytes)
- (Data8) HANDLE: file handle
- (DataX) DATA: first partition of data
CONTINUE_UPLOAD = 0x|95|
:
Arguments- (Data8) HANDLE: file handle
- (Data16) SIZE: length of the partition (in bytes, max. 1,016)
Returns- (Data8) HANDLE: file handle
- (DataX) DATA: partition of data
Class FileSystem
We add a new module ev3_file
with a class FileSystem
that reads the data from a file:
#!/usr/bin/env python3
import struct
import ev3
class FileSystem(ev3.EV3):
def write_file(self, path: str, data: bytes) -> None:
size = len(data)
cmd = b''.join([
ev3.BEGIN_DOWNLOAD,
struct.pack('<I', size), # SIZE
str.encode(path) + b'\x00' # NAME
])
reply = self.send_system_cmd(cmd)
handle = struct.unpack('B', reply[7:8])[0]
rest = size
while rest > 0:
part_size = min(1017, rest)
pos = size - rest
fmt = 'B' + str(part_size) + 's'
cmd = b''.join([
ev3.CONTINUE_DOWNLOAD,
struct.pack(fmt, handle, data[pos:pos+part_size]) # HANDLE, DATA
])
self.send_system_cmd(cmd)
rest -= part_size
def read_file(self, path: str) -> bytes:
cmd = b''.join([
ev3.BEGIN_UPLOAD,
struct.pack('<H', 1012), # SIZE
str.encode(path) + b'\x00' # NAME
])
reply = self.send_system_cmd(cmd)
(size, handle) = struct.unpack('<IB', reply[7:12])
part_size = min(1012, size)
if part_size > 0:
fmt = str(part_size) + 's'
data = struct.unpack(fmt, reply[12:])[0]
else:
data = b''
rest = size - part_size
while rest > 0:
part_size = min(1016, rest)
cmd = b''.join([
ev3.CONTINUE_UPLOAD,
struct.pack('<BH', handle, part_size) # HANDLE, SIZE
])
reply = self.send_system_cmd(cmd)
fmt = 'B' + str(part_size) + 's'
(handle, part) = struct.unpack(fmt, reply[7:])
data += part
rest -= part_size
if rest <= 0 and reply[6:7] != ev3.SYSTEM_END_OF_FILE:
raise SysCmdError("end of file not reached")
return data
Test
This helps to read large files. We read file /bin/usb-devices
with this program:
#!/usr/bin/env python3
import ev3, ev3_file, struct, hashlib
file_sys = ev3_file.FileSystem(
protocol=ev3.USB,
host='00:16:53:42:2B:99'
)
file_sys.verbosity = 1
path = "/bin/usb-devices"
data = file_sys.read_file(path)
print(data.decode("utf8"))
print("md5:", hashlib.md5(data).hexdigest().upper())
The output:
09:24:08.466537 Sent 0x|17:00|2A:00|01|94:F4:03:2F:62:69:6E:2F:75:73:62:2D:64:65:76:69:63:65:73:00|
09:24:08.473383 Recv 0x|FE:03|2A:00|03|94|00|6A:10:00:00:00:23:21:2F:62:69:6E:2F:62:61:73:68:0A:0A:...
09:24:08.476595 Sent 0x|07:00|2B:00|01|95:00:F8:03|
09:24:08.479475 Recv 0x|FE:03|2B:00|03|95|00|00:70:61:74:68:2F:62:45:6E:64:70:6F:69:6E:74:41:64:64:...
...
09:24:08.499552 Sent 0x|07:00|2E:00|01|95:00:8E:00|
09:24:08.501479 Recv 0x|94:00|2E:00|03|95|08|00:63:74:6F:72:79:20:2F:73:79:73:2F:62:75:73:20:64:6F:...
#!/bin/bash
# Copyright: 2009 Greg Kroah-Hartman <greg@kroah.com>
# 2009 Randy Dunlap <rdunlap@xenotime.net>
# 2009 Frans Pop <elendil@planet.nl>
#
# This software may be used and distributed according to the terms of
# the GNU General Public License (GPL), version 2, or at your option
# any later version.
print_string() {
file=$1
name=$2
if [ -f $file ]; then
echo "S: $name=`cat $file`"
fi
}
...
for device in /sys/bus/usb/devices/usb*
do
print_device $device 0 0 0
done
md5: 5E78E1B8C0E1E8CB73FDED5DE384C000
The program red the whole file of 4,202 bytes (1,012 + 3 * 1,016 + 142 bytes) and the file handle was 0x|00|
.
I calculated the md5 hash of the file. Wait a little moment and you will realize, why.
Reading Directories
System commands LIST_FILES and CONTINUE_LIST_FILES
Reading a directory needs these two system commands:
LIST_FILES = 0x|99|
:
Arguments- (Data16) SIZE: length of the first partition (in bytes, max. 1,012)
- (DataX) NAME: zero-terminated path to the directory (optionally including a final backslash).
If the path doesn't start with a backslash (f.i.
./...
or../...
orui/...
), it's relative from/home/root/lms2012/sys/
.
Returns- (Data32) SIZE: size of the directory data (in bytes)
- (Data8) HANDLE: file handle
- (DataX) DATA: first partition of data
CONTINUE_LIST_FILES = 0x|9A|
:
Arguments- (Data8) HANDLE: file handle
- (Data16) SIZE: length of the partition (in bytes, max. 1,016)
Returns- (Data8) HANDLE: file handle
- (DataX) DATA: partition of data
The data consist of lines. Every line stands for a subdirectory or a file. Subdirectories are a single string, that ends with a backslash, the line of a file consists of three parts:
- the md5 hash of the file (32 bytes),
- the size of the file, a 4 bytes integer (big endian) written as a string (8 bytes),
- the name of the file.
The data of directory /bin
:
zcat/
watch/
vi/
usleep/
5E78E1B8C0E1E8CB73FDED5DE384C000 0000106A usb-devices
uname/
15C768916AB69D5E49BA2B55984AB644 00008F64 umount.util-linux-ng
umount/
...
You can compare the md5 hash with the one, we calculated above. The md5 hash allows
to control the correctness of the reading and writing.
Adding method list_dir to class FileSystem
We add a method to our class, that reads the content of a directory and returns a dictionary (which is JSON):
def list_dir(self, path: str) -> dict:
cmd = b''.join([
ev3.LIST_FILES,
struct.pack('<H', 1012), # SIZE
str.encode(path) + b'\x00' # NAME
])
reply = self.send_system_cmd(cmd)
(size, handle) = struct.unpack('<IB', reply[7:12])
part_size = min(1012, size)
if part_size > 0:
fmt = str(part_size) + 's'
data = struct.unpack(fmt, reply[12:])[0]
else:
data = b''
rest = size - part_size
while rest > 0:
part_size = min(1016, rest)
cmd = b''.join([
ev3.CONTINUE_LIST_FILES,
struct.pack('<BH', handle, part_size) # HANDLE, SIZE
])
reply = self.send_system_cmd(cmd)
fmt = 'B' + str(part_size) + 's'
(handle, part) = struct.unpack(fmt, reply[7:])
data += part
rest -= part_size
if rest <= 0 and reply[6:7] != ev3.SYSTEM_END_OF_FILE:
raise SysCmdError("end of file not reached")
folders = []
files = []
for line in data.split(sep=b'\x0A'):
if line == b'':
pass
elif line.endswith(b'\x2F'):
folders.append(line.rstrip(b'\x2F').decode("utf8"))
else:
(md5, size_hex, name) = line.split(None, 2)
size = int(size_hex, 16)
files.append({
'md5': md5.decode("utf8"),
'size': size,
'name': name.decode("utf8")
})
return {'files': files, 'folders': folders}
Annotations:
- The first part is very similar to method
read_file
. - The second part creates the dictionary, which is returned.
- The dictionary has the following structure:
folders and files are separated, all information about the files is included, but the size is an integer.{'folders': ['subfolder1', 'subfolder2', ...] 'files': [{'size': 4202, 'name': 'usb-devices', 'md5': '5E78E1B8C0E1E8CB73FDED5DE384C000'}, ...]}
Test
We recursively read directory /home/
with this program:
#!/usr/bin/env python3
import ev3, ev3_file
my_ev3 = ev3_file.FileSystem(
protocol=ev3.USB,
host='00:16:53:42:2B:99'
)
def read_dir(path: str) -> None:
print(path)
try:
content = my_ev3.list_dir(path)
except ev3_file.SysCmdError:
print("error in {}".format(path))
return
for file in content['files']:
print(' '*2 + file['name'])
for folder in content['folders']:
if folder in [
'.',
'..',
]:
pass
else:
read_dir(path + folder + '/')
read_dir('/home/')
The output:
/home/
/home/root/
/home/root/lms2012/
lms2012
/home/root/lms2012/tools/
/home/root/lms2012/tools/WiFi/
icon.rgf
WiFi.rbf
GeneralAlarm.rsf
Connect.rsf
144x99_POP5.rgf
144x82_POP4.rgf
144x65_POP3.rgf
144x48_POP2.rgf
144x116_POP6.rgf
...
/home/root/lms2012/apps/Brick Program/CVS/
This is the part of the filesystem, where all your projects and
their resources are stored. You will find your history with
your EV3
and the dialog structure in these
directories. For details:
Folder Structure.
Writing Files
System commands BEGIN_DOWNLOAD and CONTINUE_DOWNLOAD
Writing data into a file on the EV3
needs two system commands:
BEGIN_DOWNLOAD = 0x|92|
:
Arguments- (Data32) SIZE: length of the file (in bytes)
- (DataX) NAME: zero-terminated path to the file (optionally including a final backslash).
If the path doesn't start with a backslash (f.i.
./...
or../...
orui/...
), it's relative from/home/root/lms2012/sys/
.
Returns- (Data8) HANDLE: file handle
CONTINUE_DOWNLOAD = 0x|93|
:
Arguments- (Data8) HANDLE: file handle
- (DataX) DATA: partition of data (max. 1,018)
Returns- (Data8) HANDLE: file handle
Adding method write_file to class FileSystem
We add a method to our class, that writes data into a file. If the file exists, it replaces it, if not it creates directory and file.:
def write_file(self, path: str, data: bytes) -> None:
size = len(data)
cmd = b''.join([
ev3.BEGIN_DOWNLOAD,
struct.pack('<I', size), # SIZE
str.encode(path) + b'\x00' # NAME
])
reply = self.send_system_cmd(cmd)
handle = struct.unpack('B', reply[7:8])[0]
rest = size
while rest > 0:
part_size = min(1017, rest)
pos = size - rest
fmt = 'B' + str(part_size) + 's'
cmd = b''.join([
ev3.CONTINUE_DOWNLOAD,
struct.pack(fmt, handle, data[pos:pos+part_size]) # HANDLE, DATA
])
self.send_system_cmd(cmd)
rest -= part_size
Annotations:
BEGIN_DOWNLOAD
tests, if the path starts with/home/root/lms2012/apps
,/home/root/lms2012/prjs
or/home/root/lms2012/tools
. This helps for security aspects. If you need to write to another path, use direct commands (opFile
with CMDsOPEN_WRITE
,OPEN_APPEND
,WRITE_BYTES
).- The max. size of a partition is 1,017 instead of 1,018. When I tested the method with the full length, it wrote incomplete files.
- When I used
SYSTEM_COMMAND_NO_REPLY
forCONTINUE_DOWNLOAD
, I got problems (again with incomplete files), maybeUSB
is too fast.
Test
We write a new file, a copy of the sound
file T-rex roar.rsf
and we control the md5 hash:
#!/usr/bin/env python3
import ev3, ev3_file, hashlib
file_sys = ev3_file.FileSystem(
protocol=ev3.USB,
host='00:16:53:42:2B:99'
)
data = open("./T-rex roar.rsf", 'rb').read()
file_sys.write_file("/home/root/lms2012/apps/tmp/T-rex roar.rsf", data)
print(file_sys.list_dir("/home/root/lms2012/apps/tmp"))
print("md5:", hashlib.md5(data).hexdigest().upper())
The output:
{'files': [{'name': 'T-rex roar.rsf', 'md5': 'F4FA8456F6004859FFF34A5667ACD781', 'size': 15521}], 'folders': ['..', '.']}
md5: F4FA8456F6004859FFF34A5667ACD781
Correct md5 hash, writing is o.k.
Deleting Files and Directories
System command DELETE_FILE
Deleting a file is done by the system command:
DELETE_FILE = 0x|9C|
:
Arguments- (DataX) NAME: zero-terminated path to the file.
If the path doesn't start with a backslash (f.i.
./...
or../...
orui/...
), it's relative from/home/root/lms2012/sys/
.
Returns- None
- (DataX) NAME: zero-terminated path to the file.
If the path doesn't start with a backslash (f.i.
Adding method del_file to class FileSystem
We add this method to our class:
def del_file(self, path: str) -> None:
cmd = b''.join([
ev3.DELETE_FILE,
str.encode(path) + b'\x00' # NAME
])
self.send_system_cmd(cmd)
def del_dir(self, path: str, secure: bool=True) -> None:
if secure:
self.del_file(path)
else:
if path.endswith("/"):
path = path[:-1]
parent_path = path.rsplit("/", 1)[0] + "/"
folder = path.rsplit("/", 1)[1]
ops = b''.join([
ev3.opFile,
ev3.GET_FOLDERS,
ev3.LCS(parent_path),
ev3.GVX(0)
])
reply = self.send_direct_cmd(ops, global_mem=1)
num = struct.unpack('B', reply[5:])[0]
found = False
for i in range(num):
ops = b''.join([
ev3.opFile,
ev3.GET_SUBFOLDER_NAME,
ev3.LCS(parent_path),
ev3.LCX(i + 1), # ITEM
ev3.LCX(64), # LENGTH
ev3.GVX(0) # NAME
])
reply = self.send_direct_cmd(ops, global_mem=64)
subdir = struct.unpack('64s', reply[5:])[0]
subdir = subdir.split(b'\x00')[0]
subdir = subdir.decode("utf8")
if subdir == folder:
found = True
ops = b''.join([
ev3.opFile,
ev3.DEL_SUBFOLDER,
ev3.LCS(parent_path), # NAME
ev3.LCX(i + 1) # ITEM
])
self.send_direct_cmd(ops)
break
if not found:
raise ev3.DirCmdError("Folder " + path + " doesn't exist")
Annotations:
del_dir
with argumentsecure=False
uses direct commands and allows to delete directories which are not empty.- The comparison shows, that system commands are comfortable alternatives to direct commands.
Test
We delete the file and directory, we have created:
#!/usr/bin/env python3
import ev3, ev3_file, hashlib
file_sys = ev3_file.FileSystem(
protocol=ev3.USB,
host='00:16:53:42:2B:99'
)
file_sys.del_file("/home/root/lms2012/apps/tmp/T-rex roar.rsf")
file_sys.del_dir("/home/root/lms2012/apps/tmp")
print(file_sys.list_dir("/home/root/lms2012/apps"))
The output:
{'folders': ['Motor Control', 'Port View', 'IR Control', 'Brick Program', '..', '.'], 'files': ...
App tmp
disappeared, o.k.
Loading resources and using them
It's time to put the things together. We start a little program, that loads a sound file
and an graphic file to the EV3
,
uses and deletes them:
#!/usr/bin/env python3
import ev3, ev3_file, ev3_sound, task
file_sys = ev3_file.FileSystem(
protocol=ev3.USB,
host='00:16:53:42:2B:99'
)
jukebox = ev3_sound.Jukebox(ev3_obj=file_sys)
jukebox.volume=100
path_app = "../apps/tmp"
sound_local = "./Thank you.rsf"
sound_ev3 = path_app + "/Thank you.rsf"
image_local = "./Smile.rgf"
image_ev3 = path_app + "/Smile.rgf"
def load_resources() -> None:
data_sound = open(sound_local, 'rb').read()
file_sys.write_file(sound_ev3, data_sound)
data_image = open(image_local, 'rb').read()
file_sys.write_file(image_ev3, data_image)
def remove_resources() -> None:
file_sys.del_dir(path_app, secure=False)
ops_smile = b''.join([
ev3.opUI_Draw,
ev3.BMPFILE,
ev3.LCX(1), # COLOR
ev3.LCX(0), # X0
ev3.LCX(0), # Y0
ev3.LCS(image_ev3), # NAME
ev3.opUI_Draw,
ev3.UPDATE
])
ops_empty = b''.join([
ev3.opUI_Draw,
ev3.FILLWINDOW,
ev3.LCX(0), # COLOR
ev3.LCX(0), # Y0
ev3.LCX(0), # Y1
ev3.opUI_Draw,
ev3.UPDATE
])
t = task.concat(
task.Task(
load_resources,
action_stop=remove_resources
),
task.Task(
file_sys.send_direct_cmd,
args=(ops_smile,)
),
jukebox.sound(sound_ev3.rsplit('.', 1)[0], duration=0.7),
task.Task(
file_sys.send_direct_cmd,
args=(ops_empty,),
duration=0.01
),
task.Task(remove_resources)
)
t.start()
Remarks:
- This sound file is part of the LEGO MINDSTORMS EV3 Home Edition. You find file
Thank you.rsf
in.../LEGO MINDSTORMS EV3 Home Edition/Resources/BrickResources/Retail/Sounds/files/Communication
. - Dito the grahic file
Smile.rgf
. You find it in.../LEGO MINDSTORMS EV3 Home Edition/Resources/BrickResources/Retail/Images/files/Expressions
. - We create a temporary folder
/home/root/lms2012/apps/tmp
, where we load the resources in. - This folder and its content are removed at the end of the task.
- Stopping the task, also removes the temporary resources.
Conclusion
We coded another subclass of EV3
and added module ev3_file
which has the following API:
Help on module ev3_file:
NAME
ev3_file - access on EV3's filesystem
CLASSES
builtins.Exception(builtins.BaseException)
SysCmdError
ev3.EV3(builtins.object)
FileSystem
class FileSystem(ev3.EV3)
| Works with EV3's filesystem
|
| Method resolution order:
| FileSystem
| ev3.EV3
| builtins.object
|
| Methods defined here:
|
| copy_file(self, path_source:str, path_dest:str) -> None
| Copies a file in the EV3's file system from
| its old location to a new one
| (no error if the file doesn't exist)
|
| Attributes:
| path_source: absolute or relative path (from "/home/root/lms2012/sys/") of the existing file
| path_dest: absolute or relative path of the new file
|
| create_dir(self, path:str) -> None
| Create a directory on EV3's file system
|
| Attributes:
| path: absolute or relative path (from "/home/root/lms2012/sys/")
|
| del_dir(self, path:str, secure:bool=True) -> None
| Delete a directory on EV3's file system
|
| Attributes:
| path: absolute or relative path (from "/home/root/lms2012/sys/")
| secure: flag, if the directory may be not empty
|
| del_file(self, path:str) -> None
| Delete a file from the EV3's file system
|
| Attributes:
| path: absolute or relative path (from "/home/root/lms2012/sys/") of the file
|
| list_dir(self, path:str) -> dict
| Read one of EV3's directories
|
| Attributes:
| path: absolute or relative path to the directory (f.i. "/bin")
|
| Returns:
| dict, that holds subfolders and files.
| folders as an array of strings (names)
| files as an array of dictionaries
| {'folders': ['subfolder1', 'subfolder2', ...]
| 'files': [{'size': 4202,
| 'name': 'usb-devices',
| 'md5': '5E78E1B8C0E1E8CB73FDED5DE384C000'}, ...]}
|
| read_file(self, path:str) -> bytes
| Read one of EV3's files
|
| Attributes:
| path: absolute or relative path to file (f.i. "/bin/sh")
|
| send_system_cmd(self, cmd:bytes, reply:bool=True) -> bytes
| Send a system command to the LEGO EV3
|
| Arguments:
| cmd: holds netto data only (cmd and arguments), the following fields are added:
| length: 2 bytes, little endian
| counter: 2 bytes, little endian
| type: 1 byte, SYSTEM_COMMAND_REPLY or SYSTEM_COMMAND_NO_REPLY
|
| Keywor Arguments:
| reply: flag if with reply
|
| Returns:
| reply (in case of SYSTEM_COMMAND_NO_REPLY: counter)
|
| write_file(self, path:str, data:bytes) -> None
| Write a file to the EV3's file system
|
| Attributes:
| path: absolute or relative path (from "/home/root/lms2012/sys/") of the file
| data: data to write into the file
|
| ----------------------------------------------------------------------
| Methods inherited from ev3.EV3:
|
...
This allows to work with resources. We have picked a few of them from the LEGO MINDSTORMS EV3 Home Edition and it's obvious, that a large collection of resources allows great applications. Let's only think of a speaking robot with a real vocabulary.
If you need to convert grahic or sound to the format,
the EV3
understands, take a look at
ImageMagick to create graphic files or
wavrsocvt to create sound files.