mirror of
https://github.com/micropython/micropython.git
synced 2025-09-07 10:20:52 +02:00
Some checks are pending
JavaScript code lint and formatting with Biome / eslint (push) Waiting to run
Check code formatting / code-formatting (push) Waiting to run
Check spelling with codespell / codespell (push) Waiting to run
Build docs / build (push) Waiting to run
Check examples / embedding (push) Waiting to run
Package mpremote / build (push) Waiting to run
.mpy file format and tools / test (push) Waiting to run
Build ports metadata / build (push) Waiting to run
cc3200 port / build (push) Waiting to run
esp32 port / build_idf (esp32_build_cmod_spiram_s2) (push) Waiting to run
esp32 port / build_idf (esp32_build_s3_c3) (push) Waiting to run
esp8266 port / build (push) Waiting to run
mimxrt port / build (push) Waiting to run
nrf port / build (push) Waiting to run
powerpc port / build (push) Waiting to run
qemu port / build_and_test_arm (push) Waiting to run
qemu port / build_and_test_rv32 (push) Waiting to run
renesas-ra port / build_renesas_ra_board (push) Waiting to run
rp2 port / build (push) Waiting to run
samd port / build (push) Waiting to run
stm32 port / build_stm32 (stm32_misc_build) (push) Waiting to run
stm32 port / build_stm32 (stm32_nucleo_build) (push) Waiting to run
stm32 port / build_stm32 (stm32_pyb_build) (push) Waiting to run
unix port / minimal (push) Waiting to run
unix port / reproducible (push) Waiting to run
unix port / standard (push) Waiting to run
unix port / standard_v2 (push) Waiting to run
unix port / coverage (push) Waiting to run
unix port / coverage_32bit (push) Waiting to run
unix port / nanbox (push) Waiting to run
unix port / float (push) Waiting to run
unix port / stackless_clang (push) Waiting to run
unix port / float_clang (push) Waiting to run
unix port / settrace (push) Waiting to run
unix port / settrace_stackless (push) Waiting to run
unix port / macos (push) Waiting to run
unix port / qemu_mips (push) Waiting to run
unix port / qemu_arm (push) Waiting to run
unix port / qemu_riscv64 (push) Waiting to run
webassembly port / build (push) Waiting to run
windows port / build-vs (Debug, x64, windows-2022, dev, 2022, [17, 18)) (push) Waiting to run
windows port / build-vs (Debug, x64, windows-latest, dev, 2017, [15, 16)) (push) Waiting to run
windows port / build-vs (Debug, x86, windows-2022, dev, 2022, [17, 18)) (push) Waiting to run
windows port / build-vs (Debug, x86, windows-latest, dev, 2017, [15, 16)) (push) Waiting to run
windows port / build-vs (Release, x64, windows-2019, dev, 2019, [16, 17)) (push) Waiting to run
windows port / build-vs (Release, x64, windows-2019, standard, 2019, [16, 17)) (push) Waiting to run
windows port / build-vs (Release, x64, windows-2022, dev, 2022, [17, 18)) (push) Waiting to run
windows port / build-vs (Release, x64, windows-2022, standard, 2022, [17, 18)) (push) Waiting to run
windows port / build-vs (Release, x64, windows-latest, dev, 2017, [15, 16)) (push) Waiting to run
windows port / build-vs (Release, x64, windows-latest, standard, 2017, [15, 16)) (push) Waiting to run
windows port / build-vs (Release, x86, windows-2019, dev, 2019, [16, 17)) (push) Waiting to run
windows port / build-vs (Release, x86, windows-2019, standard, 2019, [16, 17)) (push) Waiting to run
windows port / build-vs (Release, x86, windows-2022, dev, 2022, [17, 18)) (push) Waiting to run
windows port / build-vs (Release, x86, windows-2022, standard, 2022, [17, 18)) (push) Waiting to run
windows port / build-vs (Release, x86, windows-latest, dev, 2017, [15, 16)) (push) Waiting to run
windows port / build-vs (Release, x86, windows-latest, standard, 2017, [15, 16)) (push) Waiting to run
windows port / build-mingw (i686, mingw32, dev) (push) Waiting to run
windows port / build-mingw (i686, mingw32, standard) (push) Waiting to run
windows port / build-mingw (x86_64, mingw64, dev) (push) Waiting to run
windows port / build-mingw (x86_64, mingw64, standard) (push) Waiting to run
windows port / cross-build-on-linux (push) Waiting to run
zephyr port / build (push) Waiting to run
Python code lint and formatting with ruff / ruff (push) Waiting to run
Testing with ROMFS shows that it is relatively easy to end up with a corrupt filesystem on the device -- eg due to the ROMFS deploy process stopping half way through -- which could lead to hard crashes. Notably, there can be boot loops trying to mount a corrupt filesystem, crashes when importing modules like `os` that first scan the filesystem for `os.py`, and crashing when deploying a new ROMFS in certain cases because the old one is removed while still mounted. The main problem is that `mp_decode_uint()` has an loop that keeps going as long as it reads 0xff byte values, which can happen in the case of erased and unwritten flash. This commit adds full bounds checking in the new `mp_decode_uint_checked()` function, and that makes all ROMFS filesystem accesses robust. Signed-off-by: Damien George <damien@micropython.org>
487 lines
17 KiB
Python
487 lines
17 KiB
Python
# Test VfsRom filesystem.
|
|
|
|
try:
|
|
import errno, sys, struct, os, uctypes, vfs
|
|
|
|
vfs.VfsRom
|
|
except (ImportError, AttributeError):
|
|
print("SKIP")
|
|
raise SystemExit
|
|
|
|
try:
|
|
import select
|
|
except ImportError:
|
|
select = None
|
|
|
|
import unittest
|
|
|
|
IFDIR = 0x4000
|
|
IFREG = 0x8000
|
|
|
|
SEEK_SET = 0
|
|
SEEK_CUR = 1
|
|
SEEK_END = 2
|
|
|
|
# An mpy file with four constant objects: str, bytes, long-int, float.
|
|
test_mpy = (
|
|
# header
|
|
b"M\x06\x00\x1f" # mpy file header
|
|
b"\x06" # n_qstr
|
|
b"\x05" # n_obj
|
|
# qstrs
|
|
b"\x0etest.py\x00" # qstr0 = "test.py"
|
|
b"\x0f" # qstr1 = "<module>"
|
|
b"\x0estr_obj\x00" # qstr2 = "str_obj"
|
|
b"\x12bytes_obj\x00" # qstr3 = "bytes_obj"
|
|
b"\x0eint_obj\x00" # qstr4 = "int_obj"
|
|
b"\x12float_obj\x00" # qstr5 = "float_obj"
|
|
# objects
|
|
b"\x05\x14this is a str object\x00"
|
|
b"\x06\x16this is a bytes object\x00"
|
|
b"\x07\x0a1234567890" # long-int object
|
|
b"\x08\x041.23" # float object
|
|
b"\x05\x07str_obj\x00" # str object of existing qstr
|
|
# bytecode
|
|
b"\x81\x28" # 21 bytes, no children, bytecode
|
|
b"\x00\x02" # prelude
|
|
b"\x01" # simple name (<module>)
|
|
b"\x23\x00" # LOAD_CONST_OBJ(0)
|
|
b"\x16\x02" # STORE_NAME(str_obj)
|
|
b"\x23\x01" # LOAD_CONST_OBJ(1)
|
|
b"\x16\x03" # STORE_NAME(bytes_obj)
|
|
b"\x23\x02" # LOAD_CONST_OBJ(2)
|
|
b"\x16\x04" # STORE_NAME(int_obj)
|
|
b"\x23\x03" # LOAD_CONST_OBJ(3)
|
|
b"\x16\x05" # STORE_NAME(float_obj)
|
|
b"\x51" # LOAD_CONST_NONE
|
|
b"\x63" # RETURN_VALUE
|
|
)
|
|
|
|
|
|
class VfsRomWriter:
|
|
ROMFS_HEADER = b"\xd2\xcd\x31"
|
|
|
|
ROMFS_RECORD_KIND_UNUSED = 0
|
|
ROMFS_RECORD_KIND_PADDING = 1
|
|
ROMFS_RECORD_KIND_DATA_VERBATIM = 2
|
|
ROMFS_RECORD_KIND_DATA_POINTER = 3
|
|
ROMFS_RECORD_KIND_DIRECTORY = 4
|
|
ROMFS_RECORD_KIND_FILE = 5
|
|
|
|
def __init__(self):
|
|
self._dir_stack = [(None, bytearray())]
|
|
|
|
def _encode_uint(self, value):
|
|
encoded = [value & 0x7F]
|
|
value >>= 7
|
|
while value != 0:
|
|
encoded.insert(0, 0x80 | (value & 0x7F))
|
|
value >>= 7
|
|
return bytes(encoded)
|
|
|
|
def _pack(self, kind, payload):
|
|
return self._encode_uint(kind) + self._encode_uint(len(payload)) + payload
|
|
|
|
def _extend(self, data):
|
|
buf = self._dir_stack[-1][1]
|
|
buf.extend(data)
|
|
return len(buf)
|
|
|
|
def finalise(self):
|
|
_, data = self._dir_stack.pop()
|
|
encoded_kind = VfsRomWriter.ROMFS_HEADER
|
|
encoded_len = self._encode_uint(len(data))
|
|
if (len(encoded_kind) + len(encoded_len) + len(data)) % 2 == 1:
|
|
encoded_len = b"\x80" + encoded_len
|
|
data = encoded_kind + encoded_len + data
|
|
return data
|
|
|
|
def opendir(self, dirname):
|
|
self._dir_stack.append((dirname, bytearray()))
|
|
|
|
def closedir(self):
|
|
dirname, dirdata = self._dir_stack.pop()
|
|
dirdata = self._encode_uint(len(dirname)) + bytes(dirname, "ascii") + dirdata
|
|
self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DIRECTORY, dirdata))
|
|
|
|
def mkdata(self, data):
|
|
assert len(self._dir_stack) == 1
|
|
return self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, data)) - len(
|
|
data
|
|
)
|
|
|
|
def mkfile(self, filename, filedata, extra_payload=b""):
|
|
filename = bytes(filename, "ascii")
|
|
payload = self._encode_uint(len(filename))
|
|
payload += filename
|
|
payload += extra_payload
|
|
if isinstance(filedata, tuple):
|
|
sub_payload = self._encode_uint(filedata[0])
|
|
sub_payload += self._encode_uint(filedata[1])
|
|
payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER, sub_payload)
|
|
else:
|
|
payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, filedata)
|
|
self._dir_stack[-1][1].extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_FILE, payload))
|
|
|
|
|
|
def _make_romfs(fs, files, data_map):
|
|
for filename, contents in files:
|
|
if isinstance(contents, tuple):
|
|
fs.opendir(filename)
|
|
_make_romfs(fs, contents, data_map)
|
|
fs.closedir()
|
|
elif isinstance(contents, int):
|
|
fs.mkfile(filename, data_map[contents])
|
|
else:
|
|
fs.mkfile(filename, contents)
|
|
|
|
|
|
def make_romfs(files, data=None):
|
|
fs = VfsRomWriter()
|
|
data_map = {}
|
|
if data:
|
|
for k, v in data.items():
|
|
data_map[k] = len(v), fs.mkdata(v)
|
|
_make_romfs(fs, files, data_map)
|
|
return fs.finalise()
|
|
|
|
|
|
# A class to test if a value is within a range, needed because MicroPython's range
|
|
# doesn't support arbitrary objects.
|
|
class Range:
|
|
def __init__(self, lower, upper):
|
|
self._lower = lower
|
|
self._upper = upper
|
|
|
|
def __repr__(self):
|
|
return "Range({}, {})".format(self._lower, self._upper)
|
|
|
|
def __contains__(self, value):
|
|
return self._lower <= value < self._upper
|
|
|
|
|
|
class TestBase(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
fs_inner = make_romfs((("test_inner.txt", b"contents_inner"), ("c.py", b"")))
|
|
cls.romfs = make_romfs(
|
|
(
|
|
("fs.romfs", 0),
|
|
("test.txt", b"contents"),
|
|
(
|
|
"dir",
|
|
(
|
|
("a.py", b"x = 1"),
|
|
("b.py", b"x = 2"),
|
|
("test.mpy", test_mpy),
|
|
),
|
|
),
|
|
),
|
|
{0: fs_inner},
|
|
)
|
|
cls.romfs_ilistdir = [
|
|
("fs.romfs", IFREG, 0, 46),
|
|
("test.txt", IFREG, 0, 8),
|
|
("dir", IFDIR, 0, 198),
|
|
]
|
|
cls.romfs_listdir = [x[0] for x in cls.romfs_ilistdir]
|
|
cls.romfs_listdir_dir = ["a.py", "b.py", "test.mpy"]
|
|
cls.romfs_listdir_bytes = [bytes(x, "ascii") for x in cls.romfs_listdir]
|
|
cls.romfs_addr = uctypes.addressof(cls.romfs)
|
|
cls.romfs_addr_range = Range(cls.romfs_addr, cls.romfs_addr + len(cls.romfs))
|
|
|
|
|
|
class TestEdgeCases(unittest.TestCase):
|
|
def test_empty_romfs(self):
|
|
empty_romfs = make_romfs(())
|
|
self.assertEqual(empty_romfs, bytes([0x80 | ord("R"), 0x80 | ord("M"), ord("1"), 0]))
|
|
fs = vfs.VfsRom(empty_romfs)
|
|
self.assertIsInstance(fs, vfs.VfsRom)
|
|
fs.mount(True, False)
|
|
self.assertEqual(list(fs.ilistdir("")), [])
|
|
self.assertEqual(fs.stat(""), (IFDIR, 0, 0, 0, 0, 0, 0, 0, 0, 0))
|
|
self.assertEqual(fs.statvfs(""), (1, 0, 0, 0, 0, 0, 0, 0, 0, 32767))
|
|
|
|
def test_error(self):
|
|
for bad_romfs in (b"", b"xxx", b"not a romfs"):
|
|
with self.assertRaises(OSError) as ctx:
|
|
vfs.VfsRom(bad_romfs)
|
|
self.assertEqual(ctx.exception.errno, errno.ENODEV)
|
|
|
|
def test_unknown_record(self):
|
|
fs = VfsRomWriter()
|
|
fs._extend(fs._pack(VfsRomWriter.ROMFS_RECORD_KIND_PADDING, b"payload"))
|
|
fs.mkfile(
|
|
"test",
|
|
b"contents",
|
|
extra_payload=fs._pack(VfsRomWriter.ROMFS_RECORD_KIND_PADDING, b"pad"),
|
|
)
|
|
romfs = fs.finalise()
|
|
fs = vfs.VfsRom(romfs)
|
|
self.assertEqual(list(fs.ilistdir("")), [("test", IFREG, 0, 8)])
|
|
with fs.open("test", "rb") as f:
|
|
self.assertEqual(f.read(), b"contents")
|
|
|
|
|
|
class TestCorrupt(unittest.TestCase):
|
|
def test_corrupt_filesystem(self):
|
|
# Make the filesystem length bigger than the buffer.
|
|
romfs = bytearray(make_romfs(()))
|
|
romfs[3] = 0x01
|
|
with self.assertRaises(OSError):
|
|
vfs.VfsRom(romfs)
|
|
|
|
# Corrupt the filesystem length.
|
|
romfs = bytearray(make_romfs(()))
|
|
romfs[3] = 0xFF
|
|
with self.assertRaises(OSError):
|
|
vfs.VfsRom(romfs)
|
|
|
|
# Corrupt the contents of the filesystem.
|
|
romfs = bytearray(make_romfs(()))
|
|
romfs[3] = 0x01
|
|
romfs.extend(b"\xff\xff")
|
|
fs = vfs.VfsRom(romfs)
|
|
with self.assertRaises(OSError):
|
|
fs.stat("a")
|
|
self.assertEqual(list(fs.ilistdir("")), [])
|
|
|
|
def test_corrupt_file_entry(self):
|
|
romfs = make_romfs((("file", b"data"),))
|
|
|
|
# Corrupt the length of filename.
|
|
romfs_corrupt = bytearray(romfs)
|
|
romfs_corrupt[7:] = b"\xff" * (len(romfs) - 7)
|
|
fs = vfs.VfsRom(romfs_corrupt)
|
|
with self.assertRaises(OSError):
|
|
fs.stat("file")
|
|
self.assertEqual(list(fs.ilistdir("")), [])
|
|
|
|
# Erase the data record (change it to a padding record).
|
|
romfs_corrupt = bytearray(romfs)
|
|
romfs_corrupt[12] = VfsRomWriter.ROMFS_RECORD_KIND_PADDING
|
|
fs = vfs.VfsRom(romfs_corrupt)
|
|
with self.assertRaises(OSError):
|
|
fs.stat("file")
|
|
self.assertEqual(list(fs.ilistdir("")), [])
|
|
|
|
# Corrupt the header of the data record.
|
|
romfs_corrupt = bytearray(romfs)
|
|
romfs_corrupt[12:] = b"\xff" * (len(romfs) - 12)
|
|
fs = vfs.VfsRom(romfs_corrupt)
|
|
with self.assertRaises(OSError):
|
|
fs.stat("file")
|
|
|
|
# Corrupt the interior of the data record.
|
|
romfs_corrupt = bytearray(romfs)
|
|
romfs_corrupt[13:] = b"\xff" * (len(romfs) - 13)
|
|
fs = vfs.VfsRom(romfs_corrupt)
|
|
with self.assertRaises(OSError):
|
|
fs.stat("file")
|
|
|
|
# Change the data record to an indirect pointer and corrupt the length.
|
|
romfs_corrupt = bytearray(romfs)
|
|
romfs_corrupt[12] = VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER
|
|
romfs_corrupt[14:18] = b"\xff\xff\xff\xff"
|
|
fs = vfs.VfsRom(romfs_corrupt)
|
|
with self.assertRaises(OSError):
|
|
fs.stat("file")
|
|
|
|
# Change the data record to an indirect pointer and corrupt the offset.
|
|
romfs_corrupt = bytearray(romfs)
|
|
romfs_corrupt[12] = VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER
|
|
romfs_corrupt[14:18] = b"\x00\xff\xff\xff"
|
|
fs = vfs.VfsRom(romfs_corrupt)
|
|
with self.assertRaises(OSError):
|
|
fs.stat("file")
|
|
|
|
|
|
class TestStandalone(TestBase):
|
|
def test_constructor(self):
|
|
self.assertIsInstance(vfs.VfsRom(self.romfs), vfs.VfsRom)
|
|
with self.assertRaises(TypeError):
|
|
vfs.VfsRom(self.romfs_addr)
|
|
|
|
def test_mount(self):
|
|
vfs.VfsRom(self.romfs).mount(True, False)
|
|
with self.assertRaises(OSError):
|
|
vfs.VfsRom(self.romfs).mount(True, True)
|
|
|
|
def test_ilistdir(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
self.assertEqual(list(fs.ilistdir("")), self.romfs_ilistdir)
|
|
self.assertEqual(list(fs.ilistdir("/")), self.romfs_ilistdir)
|
|
with self.assertRaises(OSError):
|
|
fs.ilistdir("does not exist")
|
|
|
|
def test_stat(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
self.assertEqual(fs.stat(""), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
|
|
self.assertEqual(fs.stat("/"), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
|
|
self.assertEqual(fs.stat("/test.txt"), (IFREG, 0, 0, 0, 0, 0, 8, 0, 0, 0))
|
|
self.assertEqual(fs.stat("/dir"), (IFDIR, 0, 0, 0, 0, 0, 198, 0, 0, 0))
|
|
with self.assertRaises(OSError):
|
|
fs.stat("/does-not-exist")
|
|
|
|
def test_statvfs(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
self.assertEqual(fs.statvfs(""), (1, 0, 289, 0, 0, 0, 0, 0, 0, 32767))
|
|
|
|
def test_open(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
|
|
with fs.open("/test.txt", "") as f:
|
|
self.assertEqual(f.read(), "contents")
|
|
with fs.open("/test.txt", "rt") as f:
|
|
self.assertEqual(f.read(), "contents")
|
|
with fs.open("/test.txt", "rb") as f:
|
|
self.assertEqual(f.read(), b"contents")
|
|
|
|
with self.assertRaises(OSError) as ctx:
|
|
fs.open("/file-does-not-exist", "")
|
|
self.assertEqual(ctx.exception.errno, errno.ENOENT)
|
|
|
|
with self.assertRaises(OSError) as ctx:
|
|
fs.open("/dir", "rb")
|
|
self.assertEqual(ctx.exception.errno, errno.EISDIR)
|
|
|
|
with self.assertRaises(OSError):
|
|
fs.open("/test.txt", "w")
|
|
with self.assertRaises(OSError):
|
|
fs.open("/test.txt", "a")
|
|
with self.assertRaises(OSError):
|
|
fs.open("/test.txt", "+")
|
|
|
|
def test_file_seek(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
with fs.open("/test.txt", "") as f:
|
|
self.assertEqual(f.seek(0, SEEK_SET), 0)
|
|
self.assertEqual(f.seek(3, SEEK_SET), 3)
|
|
self.assertEqual(f.read(), "tents")
|
|
self.assertEqual(f.seek(0, SEEK_SET), 0)
|
|
self.assertEqual(f.seek(100, SEEK_CUR), 8)
|
|
self.assertEqual(f.seek(-1, SEEK_END), 7)
|
|
self.assertEqual(f.read(), "s")
|
|
self.assertEqual(f.seek(1, SEEK_END), 8)
|
|
with self.assertRaises(OSError):
|
|
f.seek(-1, SEEK_SET)
|
|
f.seek(0, SEEK_SET)
|
|
with self.assertRaises(OSError):
|
|
f.seek(-1, SEEK_CUR)
|
|
with self.assertRaises(OSError):
|
|
f.seek(-100, SEEK_END)
|
|
|
|
@unittest.skipIf(select is None, "no select module")
|
|
def test_file_ioctl_invalid(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
with fs.open("/test.txt", "") as f:
|
|
p = select.poll()
|
|
p.register(f)
|
|
with self.assertRaises(OSError):
|
|
p.poll(0)
|
|
|
|
def test_memory_mapping(self):
|
|
fs = vfs.VfsRom(self.romfs)
|
|
with fs.open("/test.txt", "rb") as f:
|
|
addr = uctypes.addressof(f)
|
|
data = memoryview(f)
|
|
self.assertIn(addr, self.romfs_addr_range)
|
|
self.assertIn(addr + len(data), self.romfs_addr_range)
|
|
self.assertEqual(bytes(data), b"contents")
|
|
|
|
|
|
class TestMounted(TestBase):
|
|
def setUp(self):
|
|
self.orig_sys_path = list(sys.path)
|
|
self.orig_cwd = os.getcwd()
|
|
vfs.mount(vfs.VfsRom(self.romfs), "/test_rom")
|
|
|
|
def tearDown(self):
|
|
vfs.umount("/test_rom")
|
|
os.chdir(self.orig_cwd)
|
|
sys.path = self.orig_sys_path
|
|
|
|
def test_listdir(self):
|
|
self.assertEqual(os.listdir("/test_rom"), self.romfs_listdir)
|
|
self.assertEqual(os.listdir("/test_rom/dir"), self.romfs_listdir_dir)
|
|
self.assertEqual(os.listdir(b"/test_rom"), self.romfs_listdir_bytes)
|
|
|
|
def test_chdir(self):
|
|
os.chdir("/test_rom")
|
|
self.assertEqual(os.listdir(), self.romfs_listdir)
|
|
|
|
os.chdir("/test_rom/")
|
|
self.assertEqual(os.listdir(), self.romfs_listdir)
|
|
|
|
# chdir within the romfs is not implemented.
|
|
with self.assertRaises(OSError):
|
|
os.chdir("/test_rom/dir")
|
|
|
|
def test_stat(self):
|
|
self.assertEqual(os.stat("/test_rom"), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
|
|
self.assertEqual(os.stat("/test_rom/"), (IFDIR, 0, 0, 0, 0, 0, 289, 0, 0, 0))
|
|
self.assertEqual(os.stat("/test_rom/test.txt"), (IFREG, 0, 0, 0, 0, 0, 8, 0, 0, 0))
|
|
self.assertEqual(os.stat("/test_rom/dir"), (IFDIR, 0, 0, 0, 0, 0, 198, 0, 0, 0))
|
|
with self.assertRaises(OSError):
|
|
os.stat("/test_rom/does-not-exist")
|
|
|
|
def test_open(self):
|
|
with open("/test_rom/test.txt") as f:
|
|
self.assertEqual(f.read(), "contents")
|
|
with open("/test_rom/test.txt", "b") as f:
|
|
self.assertEqual(f.read(), b"contents")
|
|
|
|
with self.assertRaises(OSError) as ctx:
|
|
open("/test_rom/file-does-not-exist")
|
|
self.assertEqual(ctx.exception.errno, errno.ENOENT)
|
|
|
|
with self.assertRaises(OSError) as ctx:
|
|
open("/test_rom/dir")
|
|
self.assertEqual(ctx.exception.errno, errno.EISDIR)
|
|
|
|
def test_import_py(self):
|
|
sys.path.append("/test_rom/dir")
|
|
a = __import__("a")
|
|
b = __import__("b")
|
|
self.assertEqual(a.__file__, "/test_rom/dir/a.py")
|
|
self.assertEqual(a.x, 1)
|
|
self.assertEqual(b.__file__, "/test_rom/dir/b.py")
|
|
self.assertEqual(b.x, 2)
|
|
|
|
def test_import_mpy(self):
|
|
sys.path.append("/test_rom/dir")
|
|
test = __import__("test")
|
|
self.assertEqual(test.__file__, "/test_rom/dir/test.mpy")
|
|
self.assertEqual(test.str_obj, "this is a str object")
|
|
self.assertEqual(test.bytes_obj, b"this is a bytes object")
|
|
self.assertEqual(test.int_obj, 1234567890)
|
|
self.assertEqual(test.float_obj, 1.23)
|
|
self.assertIn(uctypes.addressof(test.str_obj), self.romfs_addr_range)
|
|
self.assertIn(uctypes.addressof(test.bytes_obj), self.romfs_addr_range)
|
|
|
|
def test_romfs_inner(self):
|
|
with open("/test_rom/fs.romfs", "rb") as f:
|
|
romfs_inner = vfs.VfsRom(memoryview(f))
|
|
|
|
vfs.mount(romfs_inner, "/test_rom2")
|
|
|
|
self.assertEqual(os.listdir("/test_rom2"), ["test_inner.txt", "c.py"])
|
|
|
|
sys.path.append("/test_rom2")
|
|
self.assertEqual(__import__("c").__file__, "/test_rom2/c.py")
|
|
|
|
with open("/test_rom2/test_inner.txt") as f:
|
|
self.assertEqual(f.read(), "contents_inner")
|
|
|
|
with open("/test_rom2/test_inner.txt", "rb") as f:
|
|
addr = uctypes.addressof(f)
|
|
data = memoryview(f)
|
|
self.assertIn(addr, self.romfs_addr_range)
|
|
self.assertIn(addr + len(data), self.romfs_addr_range)
|
|
|
|
vfs.umount("/test_rom2")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|