mirror of
https://github.com/pyapp-kit/superqt.git
synced 2025-07-21 12:11:07 +02:00
Qthrottler and debouncer (#62)
* initial working throttler * complete typing and docs * basic test * fix future subscript * touch ups * Update src/superqt/utils/_throttler.py Co-authored-by: Grzegorz Bokota <bokota+github@gmail.com> * Update src/superqt/utils/_throttler.py Co-authored-by: Grzegorz Bokota <bokota+github@gmail.com> * Update src/superqt/utils/_throttler.py Co-authored-by: Grzegorz Bokota <bokota+github@gmail.com> Co-authored-by: Grzegorz Bokota <bokota+github@gmail.com>
This commit is contained in:
29
examples/throttle_mouse_event.py
Normal file
29
examples/throttle_mouse_event.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from qtpy.QtCore import Signal
|
||||
from qtpy.QtWidgets import QApplication, QWidget
|
||||
|
||||
from superqt.utils import qthrottled
|
||||
|
||||
|
||||
class Demo(QWidget):
|
||||
positionChanged = Signal(int, int)
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.setMouseTracking(True)
|
||||
self.positionChanged.connect(self._show_location)
|
||||
|
||||
@qthrottled(timeout=400) # call this no more than once every 400ms
|
||||
def _show_location(self, x, y):
|
||||
print("Throttled event at", x, y)
|
||||
|
||||
def mouseMoveEvent(self, event):
|
||||
print("real move event at", event.x(), event.y())
|
||||
self.positionChanged.emit(event.x(), event.y())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = QApplication([])
|
||||
w = Demo()
|
||||
w.resize(600, 600)
|
||||
w.show()
|
||||
app.exec_()
|
282
examples/throttler_demo.py
Normal file
282
examples/throttler_demo.py
Normal file
@@ -0,0 +1,282 @@
|
||||
"""Adapted for python from the KDToolBox
|
||||
|
||||
https://github.com/KDAB/KDToolBox/tree/master/qt/KDSignalThrottler
|
||||
|
||||
MIT License
|
||||
|
||||
Copyright (C) 2019-2022 Klarälvdalens Datakonsult AB, a KDAB Group company,
|
||||
info@kdab.com
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
from typing import Deque
|
||||
|
||||
from qtpy.QtCore import QRect, QSize, Qt, QTimer, Signal
|
||||
from qtpy.QtGui import QPainter, QPen
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QCheckBox,
|
||||
QComboBox,
|
||||
QFormLayout,
|
||||
QGroupBox,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QPushButton,
|
||||
QSizePolicy,
|
||||
QSpinBox,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from superqt.utils._throttler import (
|
||||
GenericSignalThrottler,
|
||||
QSignalDebouncer,
|
||||
QSignalThrottler,
|
||||
)
|
||||
|
||||
|
||||
class DrawSignalsWidget(QWidget):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
self.setFocusPolicy(Qt.FocusPolicy.NoFocus)
|
||||
self.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.MinimumExpanding)
|
||||
self.setAttribute(Qt.WA_OpaquePaintEvent)
|
||||
|
||||
self._scrollTimer = QTimer(self)
|
||||
self._scrollTimer.setInterval(10)
|
||||
self._scrollTimer.timeout.connect(self._scroll)
|
||||
self._scrollTimer.start()
|
||||
|
||||
self._signalActivations: Deque[int] = Deque()
|
||||
self._throttledSignalActivations: Deque[int] = Deque()
|
||||
|
||||
def sizeHint(self):
|
||||
return QSize(400, 200)
|
||||
|
||||
def addSignalActivation(self):
|
||||
self._signalActivations.appendleft(0)
|
||||
|
||||
def addThrottledSignalActivation(self):
|
||||
self._throttledSignalActivations.appendleft(0)
|
||||
|
||||
def _scroll(self):
|
||||
cutoff = self.width()
|
||||
self.scrollAndCut(self._signalActivations, cutoff)
|
||||
self.scrollAndCut(self._throttledSignalActivations, cutoff)
|
||||
|
||||
self.update()
|
||||
|
||||
def scrollAndCut(self, v: Deque[int], cutoff: int):
|
||||
x = 0
|
||||
L = len(v)
|
||||
for p in range(L):
|
||||
v[p] += 1
|
||||
if v[p] > cutoff:
|
||||
x = p
|
||||
break
|
||||
|
||||
# TODO: fix this... delete old ones
|
||||
|
||||
def paintEvent(self, event):
|
||||
p = QPainter(self)
|
||||
p.fillRect(self.rect(), Qt.white)
|
||||
|
||||
h = self.height()
|
||||
h2 = h // 2
|
||||
w = self.width()
|
||||
|
||||
self._drawSignals(p, self._signalActivations, Qt.red, 0, h2)
|
||||
self._drawSignals(p, self._throttledSignalActivations, Qt.blue, h2, h)
|
||||
|
||||
p.drawText(
|
||||
QRect(0, 0, w, h2),
|
||||
Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter,
|
||||
"Source signal",
|
||||
)
|
||||
p.drawText(
|
||||
QRect(0, h2, w, h2),
|
||||
Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter,
|
||||
"Throttled signal",
|
||||
)
|
||||
|
||||
p.save()
|
||||
pen = QPen()
|
||||
pen.setWidthF(2.0)
|
||||
p.drawLine(0, h2, w, h2)
|
||||
p.restore()
|
||||
|
||||
def _drawSignals(self, p: QPainter, v: Deque[int], color, yStart, yEnd):
|
||||
p.save()
|
||||
pen = QPen()
|
||||
pen.setWidthF(2.0)
|
||||
pen.setColor(color)
|
||||
p.setPen(pen)
|
||||
|
||||
for i in v:
|
||||
p.drawLine(i, yStart, i, yEnd)
|
||||
p.restore()
|
||||
|
||||
|
||||
class DemoWidget(QWidget):
|
||||
signalToBeThrottled = Signal()
|
||||
_throttler: GenericSignalThrottler
|
||||
|
||||
def __init__(self, parent=None) -> None:
|
||||
super().__init__(parent)
|
||||
self._createUi()
|
||||
self._throttler = None
|
||||
|
||||
self._throttlerKindComboBox.currentIndexChanged.connect(self._createThrottler)
|
||||
self._createThrottler()
|
||||
|
||||
self._throttlerTimeoutSpinBox.valueChanged.connect(self.setThrottlerTimeout)
|
||||
self.setThrottlerTimeout()
|
||||
|
||||
self._mainButton.clicked.connect(self.signalToBeThrottled)
|
||||
|
||||
self._autoTriggerTimer = QTimer(self)
|
||||
self._autoTriggerTimer.setTimerType(Qt.TimerType.PreciseTimer)
|
||||
self._autoTriggerCheckBox.clicked.connect(self._startOrStopAutoTriggerTimer)
|
||||
self._startOrStopAutoTriggerTimer()
|
||||
|
||||
self._autoTriggerIntervalSpinBox.valueChanged.connect(
|
||||
self._setAutoTriggerTimeout
|
||||
)
|
||||
self._setAutoTriggerTimeout()
|
||||
|
||||
self._autoTriggerTimer.timeout.connect(self.signalToBeThrottled)
|
||||
self.signalToBeThrottled.connect(self._drawSignalsWidget.addSignalActivation)
|
||||
|
||||
def _createThrottler(self) -> None:
|
||||
if self._throttler is not None:
|
||||
self._throttler.deleteLater()
|
||||
del self._throttler
|
||||
|
||||
if self._throttlerKindComboBox.currentIndex() < 2:
|
||||
cls = QSignalThrottler
|
||||
else:
|
||||
cls = QSignalDebouncer
|
||||
if self._throttlerKindComboBox.currentIndex() % 2:
|
||||
policy = QSignalThrottler.EmissionPolicy.Leading
|
||||
else:
|
||||
policy = QSignalThrottler.EmissionPolicy.Trailing
|
||||
|
||||
self._throttler: GenericSignalThrottler = cls(policy, self)
|
||||
|
||||
self._throttler.setTimerType(Qt.TimerType.PreciseTimer)
|
||||
self.signalToBeThrottled.connect(self._throttler.throttle)
|
||||
self._throttler.triggered.connect(
|
||||
self._drawSignalsWidget.addThrottledSignalActivation
|
||||
)
|
||||
|
||||
self.setThrottlerTimeout()
|
||||
|
||||
def setThrottlerTimeout(self):
|
||||
self._throttler.setTimeout(self._throttlerTimeoutSpinBox.value())
|
||||
|
||||
def _startOrStopAutoTriggerTimer(self):
|
||||
shouldStart = self._autoTriggerCheckBox.isChecked()
|
||||
if shouldStart:
|
||||
self._autoTriggerTimer.start()
|
||||
else:
|
||||
self._autoTriggerTimer.stop()
|
||||
|
||||
self._autoTriggerIntervalSpinBox.setEnabled(shouldStart)
|
||||
self._autoTriggerLabel.setEnabled(shouldStart)
|
||||
|
||||
def _setAutoTriggerTimeout(self):
|
||||
timeout = self._autoTriggerIntervalSpinBox.value()
|
||||
self._autoTriggerTimer.setInterval(timeout)
|
||||
|
||||
def _createUi(self):
|
||||
helpLabel = QLabel(self)
|
||||
helpLabel.setWordWrap(True)
|
||||
helpLabel.setText(
|
||||
"<h2>SignalThrottler example</h2>"
|
||||
"<p>This example demonstrates the differences between "
|
||||
"the different kinds of signal throttlers and debouncers."
|
||||
)
|
||||
|
||||
throttlerKindGroupBox = QGroupBox("Throttler configuration", self)
|
||||
|
||||
self._throttlerKindComboBox = QComboBox(throttlerKindGroupBox)
|
||||
self._throttlerKindComboBox.addItems(
|
||||
(
|
||||
"Throttler, trailing",
|
||||
"Throttler, leading",
|
||||
"Debouncer, trailing",
|
||||
"Debouncer, leading",
|
||||
)
|
||||
)
|
||||
|
||||
self._throttlerTimeoutSpinBox = QSpinBox(throttlerKindGroupBox)
|
||||
self._throttlerTimeoutSpinBox.setRange(1, 5000)
|
||||
self._throttlerTimeoutSpinBox.setValue(500)
|
||||
self._throttlerTimeoutSpinBox.setSuffix(" ms")
|
||||
|
||||
layout = QFormLayout(throttlerKindGroupBox)
|
||||
layout.addRow("Kind of throttler:", self._throttlerKindComboBox)
|
||||
layout.addRow("Timeout:", self._throttlerTimeoutSpinBox)
|
||||
throttlerKindGroupBox.setLayout(layout)
|
||||
|
||||
buttonGroupBox = QGroupBox("Throttler activation")
|
||||
self._mainButton = QPushButton(("Press me!"), buttonGroupBox)
|
||||
|
||||
self._autoTriggerCheckBox = QCheckBox("Trigger automatically")
|
||||
|
||||
autoTriggerLayout = QHBoxLayout()
|
||||
self._autoTriggerLabel = QLabel("Interval", buttonGroupBox)
|
||||
self._autoTriggerIntervalSpinBox = QSpinBox(buttonGroupBox)
|
||||
self._autoTriggerIntervalSpinBox.setRange(1, 5000)
|
||||
self._autoTriggerIntervalSpinBox.setValue(100)
|
||||
self._autoTriggerIntervalSpinBox.setSuffix(" ms")
|
||||
|
||||
autoTriggerLayout.setContentsMargins(0, 0, 0, 0)
|
||||
autoTriggerLayout.addWidget(self._autoTriggerLabel)
|
||||
autoTriggerLayout.addWidget(self._autoTriggerIntervalSpinBox)
|
||||
|
||||
layout = QVBoxLayout(buttonGroupBox)
|
||||
layout.addWidget(self._mainButton)
|
||||
layout.addWidget(self._autoTriggerCheckBox)
|
||||
layout.addLayout(autoTriggerLayout)
|
||||
buttonGroupBox.setLayout(layout)
|
||||
|
||||
resultGroupBox = QGroupBox("Result")
|
||||
self._drawSignalsWidget = DrawSignalsWidget(resultGroupBox)
|
||||
layout = QVBoxLayout(resultGroupBox)
|
||||
layout.addWidget(self._drawSignalsWidget)
|
||||
resultGroupBox.setLayout(layout)
|
||||
|
||||
layout = QVBoxLayout(self)
|
||||
layout.addWidget(helpLabel)
|
||||
layout.addWidget(throttlerKindGroupBox)
|
||||
layout.addWidget(buttonGroupBox)
|
||||
layout.addWidget(resultGroupBox)
|
||||
|
||||
self.setLayout(layout)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = QApplication([__name__])
|
||||
w = DemoWidget()
|
||||
w.resize(600, 600)
|
||||
w.show()
|
||||
app.exec_()
|
@@ -8,6 +8,10 @@ __all__ = (
|
||||
"QMessageHandler",
|
||||
"thread_worker",
|
||||
"WorkerBase",
|
||||
"qthrottled",
|
||||
"qdebounced",
|
||||
"QSignalDebouncer",
|
||||
"QSignalThrottler",
|
||||
)
|
||||
|
||||
from ._ensure_thread import ensure_main_thread, ensure_object_thread
|
||||
@@ -20,3 +24,4 @@ from ._qthreading import (
|
||||
new_worker_qthread,
|
||||
thread_worker,
|
||||
)
|
||||
from ._throttler import QSignalDebouncer, QSignalThrottler, qdebounced, qthrottled
|
||||
|
367
src/superqt/utils/_throttler.py
Normal file
367
src/superqt/utils/_throttler.py
Normal file
@@ -0,0 +1,367 @@
|
||||
"""Adapted for python from the KDToolBox
|
||||
|
||||
https://github.com/KDAB/KDToolBox/tree/master/qt/KDSignalThrottler
|
||||
|
||||
MIT License
|
||||
|
||||
Copyright (C) 2019-2022 Klarälvdalens Datakonsult AB, a KDAB Group company,
|
||||
info@kdab.com
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
||||
"""
|
||||
import sys
|
||||
from concurrent.futures import Future
|
||||
from enum import IntFlag, auto
|
||||
from functools import wraps
|
||||
from typing import TYPE_CHECKING, Callable, Generic, Optional, TypeVar, Union, overload
|
||||
|
||||
from qtpy.QtCore import QObject, Qt, QTimer, Signal, SignalInstance
|
||||
from typing_extensions import Literal, ParamSpec
|
||||
|
||||
|
||||
class Kind(IntFlag):
|
||||
Throttler = auto()
|
||||
Debouncer = auto()
|
||||
|
||||
|
||||
class EmissionPolicy(IntFlag):
|
||||
Trailing = auto()
|
||||
Leading = auto()
|
||||
|
||||
|
||||
class GenericSignalThrottler(QObject):
|
||||
|
||||
triggered = Signal()
|
||||
timeoutChanged = Signal(int)
|
||||
timerTypeChanged = Signal(Qt.TimerType)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
kind: Kind,
|
||||
emissionPolicy: EmissionPolicy,
|
||||
parent: Optional[QObject] = None,
|
||||
) -> None:
|
||||
super().__init__(parent)
|
||||
|
||||
self._kind = kind
|
||||
self._emissionPolicy = emissionPolicy
|
||||
self._hasPendingEmission = False
|
||||
|
||||
self._timer = QTimer()
|
||||
self._timer.setSingleShot(True)
|
||||
self._timer.setTimerType(Qt.TimerType.PreciseTimer)
|
||||
self._timer.timeout.connect(self._maybeEmitTriggered)
|
||||
|
||||
def kind(self) -> Kind:
|
||||
"""Return the kind of throttler (throttler or debouncer)."""
|
||||
return self._kind
|
||||
|
||||
def emissionPolicy(self) -> EmissionPolicy:
|
||||
"""Return the emission policy (trailing or leading)."""
|
||||
return self._emissionPolicy
|
||||
|
||||
def timeout(self) -> int:
|
||||
"""Return current timeout in milliseconds."""
|
||||
return self._timer.interval() # type: ignore
|
||||
|
||||
def setTimeout(self, timeout: int) -> None:
|
||||
"""Set timeout in milliseconds"""
|
||||
if self._timer.interval() != timeout:
|
||||
self._timer.setInterval(timeout)
|
||||
self.timeoutChanged.emit(timeout)
|
||||
|
||||
def timerType(self) -> Qt.TimerType:
|
||||
"""Return current Qt.TimerType."""
|
||||
return self._timer.timerType()
|
||||
|
||||
def setTimerType(self, timerType: Qt.TimerType) -> None:
|
||||
"""Set current Qt.TimerType."""
|
||||
if self._timer.timerType() != timerType:
|
||||
self._timer.setTimerType(timerType)
|
||||
self.timerTypeChanged.emit(timerType)
|
||||
|
||||
def throttle(self) -> None:
|
||||
"""Emit triggered if not running, then start timer."""
|
||||
# public slot
|
||||
self._hasPendingEmission = True
|
||||
# Emit only if we haven't emitted already. We know if that's
|
||||
# the case by checking if the timer is running.
|
||||
if (
|
||||
self._emissionPolicy is EmissionPolicy.Leading
|
||||
and not self._timer.isActive()
|
||||
):
|
||||
self._emitTriggered()
|
||||
|
||||
# The timer is started in all cases. If we got a signal, and we're Leading,
|
||||
# and we did emit because of that, then we don't re-emit when the timer fires
|
||||
# (unless we get ANOTHER signal).
|
||||
if self._kind is Kind.Throttler: # sourcery skip: merge-duplicate-blocks
|
||||
if not self._timer.isActive():
|
||||
self._timer.start() # actual start, not restart
|
||||
elif self._kind is Kind.Debouncer:
|
||||
self._timer.start() # restart
|
||||
|
||||
assert self._timer.isActive()
|
||||
|
||||
def cancel(self) -> None:
|
||||
""" "Cancel and pending emissions."""
|
||||
self._hasPendingEmission = False
|
||||
|
||||
def flush(self) -> None:
|
||||
""" "Force emission of any pending emissions."""
|
||||
self._maybeEmitTriggered()
|
||||
|
||||
def _emitTriggered(self) -> None:
|
||||
self._hasPendingEmission = False
|
||||
self.triggered.emit()
|
||||
self._timer.start()
|
||||
|
||||
def _maybeEmitTriggered(self) -> None:
|
||||
if self._hasPendingEmission:
|
||||
self._emitTriggered()
|
||||
|
||||
Kind = Kind
|
||||
EmissionPolicy = EmissionPolicy
|
||||
|
||||
|
||||
# ### Convenience classes ###
|
||||
|
||||
|
||||
class QSignalThrottler(GenericSignalThrottler):
|
||||
"""A Signal Throttler.
|
||||
|
||||
This object's `triggered` signal will emit at most once per timeout
|
||||
(set with setTimeout()).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
policy: EmissionPolicy = EmissionPolicy.Leading,
|
||||
parent: Optional[QObject] = None,
|
||||
) -> None:
|
||||
super().__init__(Kind.Throttler, policy, parent)
|
||||
|
||||
|
||||
class QSignalDebouncer(GenericSignalThrottler):
|
||||
"""A Signal Debouncer.
|
||||
|
||||
This object's `triggered` signal will not be emitted until `self.timeout()`
|
||||
milliseconds have elapsed since the last time `triggered` was emitted.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
policy: EmissionPolicy = EmissionPolicy.Trailing,
|
||||
parent: Optional[QObject] = None,
|
||||
) -> None:
|
||||
super().__init__(Kind.Debouncer, policy, parent)
|
||||
|
||||
|
||||
# below here part is unique to superqt (not from KD)
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing_extensions import Protocol
|
||||
|
||||
class ThrottledCallable(Generic[P, R], Protocol):
|
||||
triggered: SignalInstance
|
||||
|
||||
def cancel(self) -> None:
|
||||
...
|
||||
|
||||
def flush(self) -> None:
|
||||
...
|
||||
|
||||
def set_timeout(self, timeout: int) -> None:
|
||||
...
|
||||
|
||||
if sys.version_info < (3, 9):
|
||||
|
||||
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Future:
|
||||
...
|
||||
|
||||
else:
|
||||
|
||||
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Future[R]:
|
||||
...
|
||||
|
||||
|
||||
@overload
|
||||
def qthrottled(
|
||||
func: Callable[P, R],
|
||||
timeout: int = 100,
|
||||
leading: bool = True,
|
||||
timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer,
|
||||
) -> "ThrottledCallable[P, R]":
|
||||
...
|
||||
|
||||
|
||||
@overload
|
||||
def qthrottled(
|
||||
func: Literal[None] = None,
|
||||
timeout: int = 100,
|
||||
leading: bool = True,
|
||||
timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer,
|
||||
) -> Callable[[Callable[P, R]], "ThrottledCallable[P, R]"]:
|
||||
...
|
||||
|
||||
|
||||
def qthrottled(
|
||||
func: Optional[Callable[P, R]] = None,
|
||||
timeout: int = 100,
|
||||
leading: bool = True,
|
||||
timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer,
|
||||
) -> Union[
|
||||
"ThrottledCallable[P, R]", Callable[[Callable[P, R]], "ThrottledCallable[P, R]"]
|
||||
]:
|
||||
"""Creates a throttled function that invokes func at most once per timeout.
|
||||
|
||||
The throttled function comes with a `cancel` method to cancel delayed func
|
||||
invocations and a `flush` method to immediately invoke them. Options
|
||||
to indicate whether func should be invoked on the leading and/or trailing
|
||||
edge of the wait timeout. The func is invoked with the last arguments provided
|
||||
to the throttled function. Subsequent calls to the throttled function return
|
||||
the result of the last func invocation.
|
||||
|
||||
This decorator may be used with or without parameters.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
func : Callable
|
||||
A function to throttle
|
||||
timeout : int
|
||||
Timeout in milliseconds to wait before allowing another call, by default 100
|
||||
leading : bool
|
||||
Whether to invoke the function on the leading edge of the wait timer,
|
||||
by default True
|
||||
timer_type : Qt.TimerType
|
||||
The timer type. by default `Qt.TimerType.PreciseTimer`
|
||||
One of:
|
||||
- `Qt.PreciseTimer`: Precise timers try to keep millisecond accuracy
|
||||
- `Qt.CoarseTimer`: Coarse timers try to keep accuracy within 5% of the
|
||||
desired interval
|
||||
- `Qt.VeryCoarseTimer`: Very coarse timers only keep full second accuracy
|
||||
"""
|
||||
return _make_decorator(func, timeout, leading, timer_type, Kind.Throttler)
|
||||
|
||||
|
||||
@overload
|
||||
def qdebounced(
|
||||
func: Callable[P, R],
|
||||
timeout: int = 100,
|
||||
leading: bool = False,
|
||||
timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer,
|
||||
) -> "ThrottledCallable[P, R]":
|
||||
...
|
||||
|
||||
|
||||
@overload
|
||||
def qdebounced(
|
||||
func: Literal[None] = None,
|
||||
timeout: int = 100,
|
||||
leading: bool = False,
|
||||
timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer,
|
||||
) -> Callable[[Callable[P, R]], "ThrottledCallable[P, R]"]:
|
||||
...
|
||||
|
||||
|
||||
def qdebounced(
|
||||
func: Optional[Callable[P, R]] = None,
|
||||
timeout: int = 100,
|
||||
leading: bool = False,
|
||||
timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer,
|
||||
) -> Union[
|
||||
"ThrottledCallable[P, R]", Callable[[Callable[P, R]], "ThrottledCallable[P, R]"]
|
||||
]:
|
||||
"""Creates a debounced function that delays invoking `func`.
|
||||
|
||||
`func` will not be invoked until `timeout` ms have elapsed since the last time
|
||||
the debounced function was invoked.
|
||||
|
||||
The debounced function comes with a `cancel` method to cancel delayed func
|
||||
invocations and a `flush` method to immediately invoke them. Options
|
||||
indicate whether func should be invoked on the leading and/or trailing edge
|
||||
of the wait timeout. The func is invoked with the *last* arguments provided to
|
||||
the debounced function. Subsequent calls to the debounced function return the
|
||||
result of the last `func` invocation.
|
||||
|
||||
This decorator may be used with or without parameters.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
func : Callable
|
||||
A function to throttle
|
||||
timeout : int
|
||||
Timeout in milliseconds to wait before allowing another call, by default 100
|
||||
leading : bool
|
||||
Whether to invoke the function on the leading edge of the wait timer,
|
||||
by default False
|
||||
timer_type : Qt.TimerType
|
||||
The timer type. by default `Qt.TimerType.PreciseTimer`
|
||||
One of:
|
||||
- `Qt.PreciseTimer`: Precise timers try to keep millisecond accuracy
|
||||
- `Qt.CoarseTimer`: Coarse timers try to keep accuracy within 5% of the
|
||||
desired interval
|
||||
- `Qt.VeryCoarseTimer`: Very coarse timers only keep full second accuracy
|
||||
"""
|
||||
return _make_decorator(func, timeout, leading, timer_type, Kind.Debouncer)
|
||||
|
||||
|
||||
def _make_decorator(
|
||||
func: Optional[Callable[P, R]],
|
||||
timeout: int,
|
||||
leading: bool,
|
||||
timer_type: Qt.TimerType,
|
||||
kind: Kind,
|
||||
) -> Union[
|
||||
"ThrottledCallable[P, R]", Callable[[Callable[P, R]], "ThrottledCallable[P, R]"]
|
||||
]:
|
||||
def deco(func: Callable[P, R]) -> "ThrottledCallable[P, R]":
|
||||
policy = EmissionPolicy.Leading if leading else EmissionPolicy.Trailing
|
||||
throttle = GenericSignalThrottler(kind, policy)
|
||||
throttle.setTimerType(timer_type)
|
||||
throttle.setTimeout(timeout)
|
||||
last_f = None
|
||||
future: Optional[Future] = None
|
||||
|
||||
@wraps(func)
|
||||
def inner(*args: P.args, **kwargs: P.kwargs) -> Future:
|
||||
nonlocal last_f
|
||||
nonlocal future
|
||||
if last_f is not None:
|
||||
throttle.triggered.disconnect(last_f)
|
||||
if future is not None and not future.done():
|
||||
future.cancel()
|
||||
|
||||
future = Future()
|
||||
last_f = lambda: future.set_result(func(*args, **kwargs)) # noqa
|
||||
throttle.triggered.connect(last_f)
|
||||
throttle.throttle()
|
||||
return future
|
||||
|
||||
setattr(inner, "cancel", throttle.cancel)
|
||||
setattr(inner, "flush", throttle.flush)
|
||||
setattr(inner, "set_timeout", throttle.setTimeout)
|
||||
setattr(inner, "triggered", throttle.triggered)
|
||||
return inner # type: ignore
|
||||
|
||||
return deco(func) if func is not None else deco
|
43
tests/test_throttler.py
Normal file
43
tests/test_throttler.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from unittest.mock import Mock
|
||||
|
||||
from superqt.utils import qdebounced, qthrottled
|
||||
|
||||
|
||||
def test_debounced(qtbot):
|
||||
mock1 = Mock()
|
||||
mock2 = Mock()
|
||||
|
||||
@qdebounced(timeout=5)
|
||||
def f1() -> str:
|
||||
mock1()
|
||||
|
||||
def f2() -> str:
|
||||
mock2()
|
||||
|
||||
for _ in range(10):
|
||||
f1()
|
||||
f2()
|
||||
|
||||
qtbot.wait(5)
|
||||
mock1.assert_called_once()
|
||||
assert mock2.call_count == 10
|
||||
|
||||
|
||||
def test_throttled(qtbot):
|
||||
mock1 = Mock()
|
||||
mock2 = Mock()
|
||||
|
||||
@qthrottled(timeout=5)
|
||||
def f1() -> str:
|
||||
mock1()
|
||||
|
||||
def f2() -> str:
|
||||
mock2()
|
||||
|
||||
for _ in range(10):
|
||||
f1()
|
||||
f2()
|
||||
|
||||
qtbot.wait(5)
|
||||
assert mock1.call_count == 2
|
||||
assert mock2.call_count == 10
|
Reference in New Issue
Block a user