Files
micropython/tests/extmod/vfs_rom.py
Damien George 487c94c253 tests/extmod/vfs_rom.py: Clear sys.path before running test.
Otherwise if the target has certain files/directories (such as "test") in
its filesystem then these interfere with the unit tests.

Signed-off-by: Damien George <damien@micropython.org>
2025-05-12 13:33:42 +10:00

490 lines
17 KiB
Python

# Test VfsRom filesystem.
try:
import errno, sys, struct, os, uctypes, vfs
vfs.VfsRom
except (ImportError, AttributeError):
print("SKIP")
raise SystemExit
try:
import select
except ImportError:
select = None
import unittest
IFDIR = 0x4000
IFREG = 0x8000
SEEK_SET = 0
SEEK_CUR = 1
SEEK_END = 2
# An mpy file with four constant objects: str, bytes, long-int, float.
test_mpy = (
# header
b"M\x06\x00\x1f" # mpy file header
b"\x06" # n_qstr
b"\x05" # n_obj
# qstrs
b"\x0etest.py\x00" # qstr0 = "test.py"
b"\x0f" # qstr1 = "<module>"
b"\x0estr_obj\x00" # qstr2 = "str_obj"
b"\x12bytes_obj\x00" # qstr3 = "bytes_obj"
b"\x0eint_obj\x00" # qstr4 = "int_obj"
b"\x12float_obj\x00" # qstr5 = "float_obj"
# objects
b"\x05\x14this is a str object\x00"
b"\x06\x16this is a bytes object\x00"
b"\x07\x0a1234567890" # long-int object
b"\x08\x041.23" # float object
b"\x05\x07str_obj\x00" # str object of existing qstr
# bytecode
b"\x81\x28" # 21 bytes, no children, bytecode
b"\x00\x02" # prelude
b"\x01" # simple name (<module>)
b"\x23\x00" # LOAD_CONST_OBJ(0)
b"\x16\x02" # STORE_NAME(str_obj)
b"\x23\x01" # LOAD_CONST_OBJ(1)
b"\x16\x03" # STORE_NAME(bytes_obj)
b"\x23\x02" # LOAD_CONST_OBJ(2)
b"\x16\x04" # STORE_NAME(int_obj)
b"\x23\x03" # LOAD_CONST_OBJ(3)
b"\x16\x05" # STORE_NAME(float_obj)
b"\x51" # LOAD_CONST_NONE
b"\x63" # RETURN_VALUE
)
class VfsRomWriter:
ROMFS_HEADER = b"\xd2\xcd\x31"
ROMFS_RECORD_KIND_UNUSED = 0
ROMFS_RECORD_KIND_PADDING = 1
ROMFS_RECORD_KIND_DATA_VERBATIM = 2
ROMFS_RECORD_KIND_DATA_POINTER = 3
ROMFS_RECORD_KIND_DIRECTORY = 4
ROMFS_RECORD_KIND_FILE = 5
def __init__(self):
self._dir_stack = [(None, bytearray())]
def _encode_uint(self, value):
encoded = [value & 0x7F]
value >>= 7
while value != 0:
encoded.insert(0, 0x80 | (value & 0x7F))
value >>= 7
return bytes(encoded)
def _pack(self, kind, payload):
return self._encode_uint(kind) + self._encode_uint(len(payload)) + payload
def _extend(self, data):
buf = self._dir_stack[-1][1]
buf.extend(data)
return len(buf)
def finalise(self):
_, data = self._dir_stack.pop()
encoded_kind = VfsRomWriter.ROMFS_HEADER
encoded_len = self._encode_uint(len(data))
if (len(encoded_kind) + len(encoded_len) + len(data)) % 2 == 1:
encoded_len = b"\x80" + encoded_len
data = encoded_kind + encoded_len + data
return data
def opendir(self, dirname):
self._dir_stack.append((dirname, bytearray()))
def closedir(self):
dirname, dirdata = self._dir_stack.pop()
dirdata = self._encode_uint(len(dirname)) + bytes(dirname, "ascii") + dirdata
self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DIRECTORY, dirdata))
def mkdata(self, data):
assert len(self._dir_stack) == 1
return self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, data)) - len(
data
)
def mkfile(self, filename, filedata, extra_payload=b""):
filename = bytes(filename, "ascii")
payload = self._encode_uint(len(filename))
payload += filename
payload += extra_payload
if isinstance(filedata, tuple):
sub_payload = self._encode_uint(filedata[0])
sub_payload += self._encode_uint(filedata[1])
payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER, sub_payload)
else:
payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, filedata)
self._dir_stack[-1][1].extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_FILE, payload))
def _make_romfs(fs, files, data_map):
for filename, contents in files:
if isinstance(contents, tuple):
fs.opendir(filename)
_make_romfs(fs, contents, data_map)
fs.closedir()
elif isinstance(contents, int):
fs.mkfile(filename, data_map[contents])
else:
fs.mkfile(filename, contents)
def make_romfs(files, data=None):
fs = VfsRomWriter()
data_map = {}
if data:
for k, v in data.items():
data_map[k] = len(v), fs.mkdata(v)
_make_romfs(fs, files, data_map)
return fs.finalise()
# A class to test if a value is within a range, needed because MicroPython's range
# doesn't support arbitrary objects.
class Range:
def __init__(self, lower, upper):
self._lower = lower
self._upper = upper
def __repr__(self):
return "Range({}, {})".format(self._lower, self._upper)
def __contains__(self, value):
return self._lower <= value < self._upper
class TestBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
fs_inner = make_romfs((("test_inner.txt", b"contents_inner"), ("c.py", b"")))
cls.romfs = make_romfs(
(
("fs.romfs", 0),
("test.txt", b"contents"),
(
"dir",
(
("a.py", b"x = 1"),
("b.py", b"x = 2"),
("test.mpy", test_mpy),
),
),
),
{0: fs_inner},
)
cls.romfs_ilistdir = [
("fs.romfs", IFREG, 0, 46),
("test.txt", IFREG, 0, 8),
("dir", IFDIR, 0, 198),
]
cls.romfs_listdir = [x[0] for x in cls.romfs_ilistdir]
cls.romfs_listdir_dir = ["a.py", "b.py", "test.mpy"]
cls.romfs_listdir_bytes = [bytes(x, "ascii") for x in cls.romfs_listdir]
cls.romfs_addr = uctypes.addressof(cls.romfs)
cls.romfs_addr_range = Range(cls.romfs_addr, cls.romfs_addr + len(cls.romfs))
class TestEdgeCases(unittest.TestCase):
def test_empty_romfs(self):
empty_romfs = make_romfs(())
self.assertEqual(empty_romfs, bytes([0x80 | ord("R"), 0x80 | ord("M"), ord("1"), 0]))
fs = vfs.VfsRom(empty_romfs)
self.assertIsInstance(fs, vfs.VfsRom)
fs.mount(True, False)
self.assertEqual(list(fs.ilistdir("")), [])
self.assertEqual(fs.stat(""), (IFDIR, 0, 0, 0, 0, 0, 0, 0, 0, 0))
self.assertEqual(fs.statvfs(""), (1, 0, 0, 0, 0, 0, 0, 0, 0, 32767))
def test_error(self):
for bad_romfs in (b"", b"xxx", b"not a romfs"):
with self.assertRaises(OSError) as ctx:
vfs.VfsRom(bad_romfs)
self.assertEqual(ctx.exception.errno, errno.ENODEV)
def test_unknown_record(self):
fs = VfsRomWriter()
fs._extend(fs._pack(VfsRomWriter.ROMFS_RECORD_KIND_PADDING, b"payload"))
fs.mkfile(
"test",
b"contents",
extra_payload=fs._pack(VfsRomWriter.ROMFS_RECORD_KIND_PADDING, b"pad"),
)
romfs = fs.finalise()
fs = vfs.VfsRom(romfs)
self.assertEqual(list(fs.ilistdir("")), [("test", IFREG, 0, 8)])
with fs.open("test", "rb") as f:
self.assertEqual(f.read(), b"contents")
class TestCorrupt(unittest.TestCase):
def test_corrupt_filesystem(self):
# Make the filesystem length bigger than the buffer.
romfs = bytearray(make_romfs(()))
romfs[3] = 0x01
with self.assertRaises(OSError):
vfs.VfsRom(romfs)
# Corrupt the filesystem length.
romfs = bytearray(make_romfs(()))
romfs[3] = 0xFF
with self.assertRaises(OSError):
vfs.VfsRom(romfs)
# Corrupt the contents of the filesystem.
romfs = bytearray(make_romfs(()))
romfs[3] = 0x01
romfs.extend(b"\xff\xff")
fs = vfs.VfsRom(romfs)
with self.assertRaises(OSError):
fs.stat("a")
self.assertEqual(list(fs.ilistdir("")), [])
def test_corrupt_file_entry(self):
romfs = make_romfs((("file", b"data"),))
# Corrupt the length of filename.
romfs_corrupt = bytearray(romfs)
romfs_corrupt[7:] = b"\xff" * (len(romfs) - 7)
fs = vfs.VfsRom(romfs_corrupt)
with self.assertRaises(OSError):
fs.stat("file")
self.assertEqual(list(fs.ilistdir("")), [])
# Erase the data record (change it to a padding record).
romfs_corrupt = bytearray(romfs)
romfs_corrupt[12] = VfsRomWriter.ROMFS_RECORD_KIND_PADDING
fs = vfs.VfsRom(romfs_corrupt)
with self.assertRaises(OSError):
fs.stat("file")
self.assertEqual(list(fs.ilistdir("")), [])
# Corrupt the header of the data record.
romfs_corrupt = bytearray(romfs)
romfs_corrupt[12:] = b"\xff" * (len(romfs) - 12)
fs = vfs.VfsRom(romfs_corrupt)
with self.assertRaises(OSError):
fs.stat("file")
# Corrupt the interior of the data record.
romfs_corrupt = bytearray(romfs)
romfs_corrupt[13:] = b"\xff" * (len(romfs) - 13)
fs = vfs.VfsRom(romfs_corrupt)
with self.assertRaises(OSError):
fs.stat("file")
# Change the data record to an indirect pointer and corrupt the length.
romfs_corrupt = bytearray(romfs)
romfs_corrupt[12] = VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER
romfs_corrupt[14:18] = b"\xff\xff\xff\xff"
fs = vfs.VfsRom(romfs_corrupt)
with self.assertRaises(OSError):
fs.stat("file")
# Change the data record to an indirect pointer and corrupt the offset.
romfs_corrupt = bytearray(romfs)
romfs_corrupt[12] = VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER
romfs_corrupt[14:18] = b"\x00\xff\xff\xff"
fs = vfs.VfsRom(romfs_corrupt)
with self.assertRaises(OSError):
fs.stat("file")
class TestStandalone(TestBase):
def test_constructor(self):
self.assertIsInstance(vfs.VfsRom(self.romfs), vfs.VfsRom)
with self.assertRaises(TypeError):
vfs.VfsRom(self.romfs_addr)
def test_mount(self):
vfs.VfsRom(self.romfs).mount(True, False)
with self.assertRaises(OSError):
vfs.VfsRom(self.romfs).mount(True, True)
def test_ilistdir(self):
fs = vfs.VfsRom(self.romfs)
self.assertEqual(list(fs.ilistdir("")), self.romfs_ilistdir)
self.assertEqual(list(fs.ilistdir("/")), self.romfs_ilistdir)
with self.assertRaises(OSError):
fs.ilistdir("does not exist")
def test_stat(self):
fs = vfs.VfsRom(self.romfs)
self.assertEqual(fs.stat(""), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
self.assertEqual(fs.stat("/"), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
self.assertEqual(fs.stat("/test.txt"), (IFREG, 0, 0, 0, 0, 0, 8, 0, 0, 0))
self.assertEqual(fs.stat("/dir"), (IFDIR, 0, 0, 0, 0, 0, 198, 0, 0, 0))
with self.assertRaises(OSError):
fs.stat("/does-not-exist")
def test_statvfs(self):
fs = vfs.VfsRom(self.romfs)
self.assertEqual(fs.statvfs(""), (1, 0, 289, 0, 0, 0, 0, 0, 0, 32767))
def test_open(self):
fs = vfs.VfsRom(self.romfs)
with fs.open("/test.txt", "") as f:
self.assertEqual(f.read(), "contents")
with fs.open("/test.txt", "rt") as f:
self.assertEqual(f.read(), "contents")
with fs.open("/test.txt", "rb") as f:
self.assertEqual(f.read(), b"contents")
with self.assertRaises(OSError) as ctx:
fs.open("/file-does-not-exist", "")
self.assertEqual(ctx.exception.errno, errno.ENOENT)
with self.assertRaises(OSError) as ctx:
fs.open("/dir", "rb")
self.assertEqual(ctx.exception.errno, errno.EISDIR)
with self.assertRaises(OSError):
fs.open("/test.txt", "w")
with self.assertRaises(OSError):
fs.open("/test.txt", "a")
with self.assertRaises(OSError):
fs.open("/test.txt", "+")
def test_file_seek(self):
fs = vfs.VfsRom(self.romfs)
with fs.open("/test.txt", "") as f:
self.assertEqual(f.seek(0, SEEK_SET), 0)
self.assertEqual(f.seek(3, SEEK_SET), 3)
self.assertEqual(f.read(), "tents")
self.assertEqual(f.seek(0, SEEK_SET), 0)
self.assertEqual(f.seek(100, SEEK_CUR), 8)
self.assertEqual(f.seek(-1, SEEK_END), 7)
self.assertEqual(f.read(), "s")
self.assertEqual(f.seek(1, SEEK_END), 8)
with self.assertRaises(OSError):
f.seek(-1, SEEK_SET)
f.seek(0, SEEK_SET)
with self.assertRaises(OSError):
f.seek(-1, SEEK_CUR)
with self.assertRaises(OSError):
f.seek(-100, SEEK_END)
@unittest.skipIf(select is None, "no select module")
def test_file_ioctl_invalid(self):
fs = vfs.VfsRom(self.romfs)
with fs.open("/test.txt", "") as f:
p = select.poll()
p.register(f)
with self.assertRaises(OSError):
p.poll(0)
def test_memory_mapping(self):
fs = vfs.VfsRom(self.romfs)
with fs.open("/test.txt", "rb") as f:
addr = uctypes.addressof(f)
data = memoryview(f)
self.assertIn(addr, self.romfs_addr_range)
self.assertIn(addr + len(data), self.romfs_addr_range)
self.assertEqual(bytes(data), b"contents")
class TestMounted(TestBase):
def setUp(self):
self.orig_sys_path = list(sys.path)
self.orig_cwd = os.getcwd()
sys.path = []
vfs.mount(vfs.VfsRom(self.romfs), "/test_rom")
def tearDown(self):
vfs.umount("/test_rom")
os.chdir(self.orig_cwd)
sys.path = self.orig_sys_path
def test_listdir(self):
self.assertEqual(os.listdir("/test_rom"), self.romfs_listdir)
self.assertEqual(os.listdir("/test_rom/dir"), self.romfs_listdir_dir)
self.assertEqual(os.listdir(b"/test_rom"), self.romfs_listdir_bytes)
def test_chdir(self):
os.chdir("/test_rom")
self.assertEqual(os.getcwd(), "/test_rom")
self.assertEqual(os.listdir(), self.romfs_listdir)
os.chdir("/test_rom/")
self.assertEqual(os.getcwd(), "/test_rom")
self.assertEqual(os.listdir(), self.romfs_listdir)
# chdir within the romfs is not implemented.
with self.assertRaises(OSError):
os.chdir("/test_rom/dir")
def test_stat(self):
self.assertEqual(os.stat("/test_rom"), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
self.assertEqual(os.stat("/test_rom/"), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
self.assertEqual(os.stat("/test_rom/test.txt"), (IFREG, 0, 0, 0, 0, 0, 8, 0, 0, 0))
self.assertEqual(os.stat("/test_rom/dir"), (IFDIR, 0, 0, 0, 0, 0, 198, 0, 0, 0))
with self.assertRaises(OSError):
os.stat("/test_rom/does-not-exist")
def test_open(self):
with open("/test_rom/test.txt") as f:
self.assertEqual(f.read(), "contents")
with open("/test_rom/test.txt", "b") as f:
self.assertEqual(f.read(), b"contents")
with self.assertRaises(OSError) as ctx:
open("/test_rom/file-does-not-exist")
self.assertEqual(ctx.exception.errno, errno.ENOENT)
with self.assertRaises(OSError) as ctx:
open("/test_rom/dir")
self.assertEqual(ctx.exception.errno, errno.EISDIR)
def test_import_py(self):
sys.path.append("/test_rom/dir")
a = __import__("a")
b = __import__("b")
self.assertEqual(a.__file__, "/test_rom/dir/a.py")
self.assertEqual(a.x, 1)
self.assertEqual(b.__file__, "/test_rom/dir/b.py")
self.assertEqual(b.x, 2)
def test_import_mpy(self):
sys.path.append("/test_rom/dir")
test = __import__("test")
self.assertEqual(test.__file__, "/test_rom/dir/test.mpy")
self.assertEqual(test.str_obj, "this is a str object")
self.assertEqual(test.bytes_obj, b"this is a bytes object")
self.assertEqual(test.int_obj, 1234567890)
self.assertEqual(test.float_obj, 1.23)
self.assertIn(uctypes.addressof(test.str_obj), self.romfs_addr_range)
self.assertIn(uctypes.addressof(test.bytes_obj), self.romfs_addr_range)
def test_romfs_inner(self):
with open("/test_rom/fs.romfs", "rb") as f:
romfs_inner = vfs.VfsRom(memoryview(f))
vfs.mount(romfs_inner, "/test_rom2")
self.assertEqual(os.listdir("/test_rom2"), ["test_inner.txt", "c.py"])
sys.path.append("/test_rom2")
self.assertEqual(__import__("c").__file__, "/test_rom2/c.py")
with open("/test_rom2/test_inner.txt") as f:
self.assertEqual(f.read(), "contents_inner")
with open("/test_rom2/test_inner.txt", "rb") as f:
addr = uctypes.addressof(f)
data = memoryview(f)
self.assertIn(addr, self.romfs_addr_range)
self.assertIn(addr + len(data), self.romfs_addr_range)
vfs.umount("/test_rom2")
if __name__ == "__main__":
unittest.main()