mirror of
https://github.com/micropython/micropython.git
synced 2025-09-08 02:40:55 +02:00
Adds a Python override of the `machine` module, which delegates to the built-in module and adds an implementation of `Counter` and `Encoder`, based on the `esp32.PCNT` class. Original implementation by: Jonathan Hogg <me@jonathanhogg.com> Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
193 lines
7.0 KiB
Python
193 lines
7.0 KiB
Python
import sys
|
|
|
|
_path = sys.path
|
|
sys.path = ()
|
|
try:
|
|
import machine as _machine
|
|
finally:
|
|
sys.path = _path
|
|
del _path
|
|
del sys
|
|
|
|
|
|
from micropython import const
|
|
import esp32
|
|
|
|
if hasattr(esp32, "PCNT"):
|
|
_PCNT_RANGE = const(32000)
|
|
|
|
class _CounterBase:
|
|
_PCNT = esp32.PCNT
|
|
# Singletons, keyed by PCNT unit_id (shared by both Counter & Encoder).
|
|
_INSTANCES = {}
|
|
|
|
# Use __new__ to implement a singleton rather than a factory function,
|
|
# because we need to be able to provide class attributes, e.g.
|
|
# Counter.RISING, which is not possible if Counter was a function
|
|
# (functions cannot have attributes in MicroPython).
|
|
def __new__(cls, unit_id, *_args, **_kwargs):
|
|
# Find an existing instance for this PCNT unit id.
|
|
self = cls._INSTANCES.get(unit_id)
|
|
|
|
if self:
|
|
# Verify that this PCNT is being used for the same type
|
|
# (Encoder or Counter).
|
|
if not isinstance(self, cls):
|
|
raise ValueError("PCNT in use")
|
|
else:
|
|
# Previously unused PCNT unit.
|
|
self = object.__new__(cls)
|
|
cls._INSTANCES[unit_id] = self
|
|
|
|
# __init__ will now be called with the same args.
|
|
return self
|
|
|
|
def __init__(self, unit_id, *args, filter_ns=0, **kwargs):
|
|
self._unit_id = unit_id
|
|
|
|
if not hasattr(self, "_pcnt"):
|
|
# New instance, or previously deinit-ed.
|
|
self._pcnt = self._PCNT(unit_id, min=-_PCNT_RANGE, max=_PCNT_RANGE)
|
|
elif not (args or kwargs):
|
|
# Existing instance, and no args, so accessing the existing
|
|
# singleton without reconfiguring. Note: This means that
|
|
# Counter/Encoder cannot be partially re-initalised. Either
|
|
# you get the existing instance as-is (by passing no arguments
|
|
# other than the id), or you must pass all the necessary
|
|
# arguments to additionally re-configure it.
|
|
return
|
|
|
|
# Counter- or Encoder-specific configuration of self._pcnt.
|
|
self._configure(*args, **kwargs)
|
|
|
|
# Common unit configuration.
|
|
self._pcnt.init(
|
|
filter=min(max(0, filter_ns * 80 // 1000), 1023),
|
|
value=0,
|
|
)
|
|
|
|
# Note: We track number-of-overflows rather than the actual count in
|
|
# order to avoid the IRQ handler overflowing MicroPython's "small int"
|
|
# range. This gives an effective range of 2**30 overflows. User code
|
|
# should use counter.value(0) to reset the overflow count.
|
|
# The ESP32 PCNT resets to zero on under/overflow (i.e. it does not wrap
|
|
# around to the opposite limit), so each overflow corresponds to exactly
|
|
# _PCNT_RANGE counts.
|
|
|
|
# Reset counter state.
|
|
self._overflows = 0
|
|
self._offset = 0
|
|
|
|
# Install IRQ handler to handle under/overflow.
|
|
self._pcnt.irq(self._overflow, self._PCNT.IRQ_MIN | self._PCNT.IRQ_MAX)
|
|
|
|
# Start counting.
|
|
self._pcnt.start()
|
|
|
|
# Handle counter under/overflow.
|
|
def _overflow(self, pcnt):
|
|
mask = pcnt.irq().flags()
|
|
if mask & self._PCNT.IRQ_MIN:
|
|
self._overflows -= 1
|
|
elif mask & self._PCNT.IRQ_MAX:
|
|
self._overflows += 1
|
|
|
|
# Public machine.Counter & machine.Encoder API.
|
|
def init(self, *args, **kwargs):
|
|
self.__init__(self._unit_id, *args, **kwargs)
|
|
|
|
# Public machine.Counter & machine.Encoder API.
|
|
def deinit(self):
|
|
if hasattr(self, "_pcnt"):
|
|
self._pcnt.deinit()
|
|
del self._pcnt
|
|
|
|
# Public machine.Counter & machine.Encoder API.
|
|
def value(self, value=None):
|
|
if not hasattr(self, "_pcnt"):
|
|
raise RuntimeError("not initialised")
|
|
|
|
# This loop deals with the possibility that a PCNT overflow occurs
|
|
# between retrieving self._overflows and self._pcnt.value().
|
|
while True:
|
|
overflows = self._overflows
|
|
current = self._pcnt.value()
|
|
# Calling PCNT.value() forces any pending interrupts to run
|
|
# for this PCNT unit. So self._overflows must now be the the
|
|
# value corresponding to the value we read.
|
|
if self._overflows == overflows:
|
|
break
|
|
|
|
# Compute the result including the number of times we've cycled
|
|
# through the range, and any applied offset.
|
|
result = overflows * _PCNT_RANGE + current + self._offset
|
|
|
|
# If a new value is specified, then zero out the overflows, and set
|
|
# self._offset so that it zeros out the current PCNT value. The
|
|
# mutation to self._overflows is atomic w.r.t. the overflow IRQ
|
|
# handler because the scheduler only runs on branch instructions.
|
|
if value is not None:
|
|
self._overflows -= overflows
|
|
self._offset = value - current
|
|
|
|
return result
|
|
|
|
class Counter(_CounterBase):
|
|
# Public machine.Counter API.
|
|
RISING = 1
|
|
FALLING = 2
|
|
UP = _CounterBase._PCNT.INCREMENT
|
|
DOWN = _CounterBase._PCNT.DECREMENT
|
|
|
|
# Counter-specific configuration.
|
|
def _configure(self, src, edge=RISING, direction=UP):
|
|
# Only use the first channel.
|
|
self._pcnt.init(
|
|
channel=0,
|
|
pin=src,
|
|
rising=direction if edge & Counter.RISING else self._PCNT.IGNORE,
|
|
falling=direction if edge & Counter.FALLING else self._PCNT.IGNORE,
|
|
)
|
|
|
|
class Encoder(_CounterBase):
|
|
# Encoder-specific configuration.
|
|
def _configure(self, phase_a, phase_b, phases=1):
|
|
if phases not in (1, 2, 4):
|
|
raise ValueError("phases")
|
|
# Configure the first channel.
|
|
self._pcnt.init(
|
|
channel=0,
|
|
pin=phase_a,
|
|
falling=self._PCNT.INCREMENT,
|
|
rising=self._PCNT.DECREMENT,
|
|
mode_pin=phase_b,
|
|
mode_low=self._PCNT.HOLD if phases == 1 else self._PCNT.REVERSE,
|
|
)
|
|
if phases == 4:
|
|
# For 4x quadrature, enable the second channel.
|
|
self._pcnt.init(
|
|
channel=1,
|
|
pin=phase_b,
|
|
falling=self._PCNT.DECREMENT,
|
|
rising=self._PCNT.INCREMENT,
|
|
mode_pin=phase_a,
|
|
mode_low=self._PCNT.REVERSE,
|
|
)
|
|
else:
|
|
# For 1x and 2x quadrature, disable the second channel.
|
|
self._pcnt.init(channel=1, pin=None, rising=self._PCNT.IGNORE)
|
|
self._phases = phases
|
|
|
|
def phases(self):
|
|
return self._phases
|
|
|
|
del _CounterBase
|
|
|
|
|
|
del esp32
|
|
|
|
|
|
# Delegate to built-in machine module.
|
|
def __getattr__(attr):
|
|
return getattr(_machine, attr)
|