Version

Quick search

Table Of Contents

Source code for kivy.clock

'''
Clock object
============

The :class:`Clock` object allows you to schedule a function call in the
future; once or repeatedly at specified intervals. You can get the time
elapsed between the scheduling and the calling of the callback via the
`dt` argument::

    # dt means delta-time
    def my_callback(dt):
        pass

    # call my_callback every 0.5 seconds
    Clock.schedule_interval(my_callback, 0.5)

    # call my_callback in 5 seconds
    Clock.schedule_once(my_callback, 5)

    # call my_callback as soon as possible (usually next frame.)
    Clock.schedule_once(my_callback)

.. note::

    If the callback returns False, the schedule will be canceled and won't
    repeat.

If you want to schedule a function to call with default arguments, you can use
the `functools.partial
<http://docs.python.org/library/functools.html#functools.partial>`_ python
module::

    from functools import partial

    def my_callback(value, key, *largs):
        pass

    Clock.schedule_interval(partial(my_callback, 'my value', 'my key'), 0.5)

Conversely, if you want to schedule a function that doesn't accept the dt
argument, you can use a `lambda
<http://docs.python.org/2/reference/expressions.html#lambda>`_ expression
to write a short function that does accept dt. For Example::

    def no_args_func():
        print("I accept no arguments, so don't schedule me in the clock")

    Clock.schedule_once(lambda dt: no_args_func(), 0.5)

.. note::

    You cannot unschedule an anonymous function unless you keep a
    reference to it. It's better to add \\*args to your function
    definition so that it can be called with an arbitrary number of
    parameters.

.. important::

    The class method callback is weak-referenced: you are responsible for
    keeping a reference to your original object/callback. If you don't keep a
    reference, the ClockBase will never execute your callback. For
    example::

        class Foo(object):
            def start(self):
                Clock.schedule_interval(self.callback, 0.5)

            def callback(self, dt):
                print('In callback')

        # A Foo object is created and the method start is called.
        # Because no reference is kept to the instance returned from Foo(),
        # the object will be collected by the Python Garbage Collector and
        # your callback will be never called.
        Foo().start()

        # So you should do the following and keep a reference to the instance
        # of foo until you don't need it anymore!
        foo = Foo()
        foo.start()


.. _schedule-before-frame:

Schedule before frame
---------------------

.. versionadded:: 1.0.5

Sometimes you need to schedule a callback BEFORE the next frame. Starting
from 1.0.5, you can use a timeout of -1::

    Clock.schedule_once(my_callback, 0) # call after the next frame
    Clock.schedule_once(my_callback, -1) # call before the next frame

The Clock will execute all the callbacks with a timeout of -1 before the
next frame even if you add a new callback with -1 from a running
callback. However, :class:`Clock` has an iteration limit for these
callbacks: it defaults to 10.

If you schedule a callback that schedules a callback that schedules a ... etc
more than 10 times, it will leave the loop and send a warning to the console,
then continue after the next frame. This is implemented to prevent bugs from
hanging or crashing the application.

If you need to increase the limit, set the :attr:`max_iteration` property::

    from kivy.clock import Clock
    Clock.max_iteration = 20

.. _triggered-events:

Triggered Events
----------------

.. versionadded:: 1.0.5

:meth:`CyClockBase.create_trigger` is an advanced method way to defer a
callback. It functions exactly like :meth:`CyClockBase.schedule_once` and
:meth:`CyClockBase.schedule_interval` except that it doesn't immediately
schedule the callback. Instead, one schedules the callback using the
:class:`ClockEvent` returned by it. This ensures that you can call the event
multiple times but it won't be scheduled more than once. This is not the case
with :meth:`CyClockBase.schedule_once`::

    # will run the callback twice before the next frame
    Clock.schedule_once(my_callback)
    Clock.schedule_once(my_callback)

    # will run the callback once before the next frame
    event = Clock.create_trigger(my_callback)
    event()
    event()

    # will also run the callback only once before the next frame
    event = Clock.schedule_once(my_callback)  # now it's already scheduled
    event()  # won't be scheduled again
    event()

In addition, it is more convenient to create and bind to
the triggered event than using :meth:`CyClockBase.schedule_once` in a
function::

    from kivy.clock import Clock
    from kivy.uix.widget import Widget

    class Sample(Widget):
        def __init__(self, **kwargs):
            self._trigger = Clock.create_trigger(self.cb)
            super(Sample, self).__init__(**kwargs)
            self.bind(x=self._trigger, y=self._trigger)

        def cb(self, *largs):
            pass

Even if x and y changes within one frame, the callback is only run once.

Unscheduling
-------------

An event scheduled with :meth:`CyClockBase.schedule_once`,
:meth:`CyClockBase.schedule_interval`, or with
:meth:`CyClockBase.create_trigger` and then triggered can be unscheduled in
multiple ways. E.g::

    def my_callback(dt):
        pass

    # call my_callback every 0.5 seconds
    event = Clock.schedule_interval(my_callback, 0.5)

    # call my_callback in 5 seconds
    event2 = Clock.schedule_once(my_callback, 5)

    event_trig = Clock.create_trigger(my_callback, 5)
    event_trig()

    # unschedule using cancel
    event.cancel()

    # unschedule using Clock.unschedule
    Clock.unschedule(event2)

    # unschedule using Clock.unschedule with the callback
    # NOT RECOMMENDED
    Clock.unschedule(my_callback)

The best way to unschedule a callback is with :meth:`ClockEvent.cancel`.
:meth:`CyClockBase.unschedule` is mainly an alias for that for that function.
However, if the original callback itself is passed to
:meth:`CyClockBase.unschedule`, it'll unschedule all instances of that
callback (provided ``all`` is True, the default, otherwise only the first match
is removed).

Calling :meth:`CyClockBase.unschedule` on the original callback is highly
discouraged because it's significantly slower than when using the event.

Clock Lifecycle
---------------

Kivy's clock has a lifecycle. By default, scheduling a callback after the Clock
has ended will not raise an error, even though the callback may never be
called. That's because most callbacks are like services, e.g. responding to a
user button press - if the app is running the callbacks need to service the app
and respond to the input, but once the app has stopped or is stopping, we can
safely not process these events.

Other events always need to be processed. E.g. another thread may request a
callback in kivy's thread and then process some result. If the event is not
processed in Kivy's thread because the app stopped, the second thread may
block forever hanging the application as it exits.

Consequently, we provide a API
(:meth:`CyClockBase.create_lifecycle_aware_trigger`) for scheduling callbacks
that raise a :class:`ClockNotRunningError` if the clock has stopped. If the
scheduling succeeded it guarantees that one of its callbacks will be called.
I.e. the new :meth:`CyClockBase.create_lifecycle_aware_trigger` accepts an
additional ``clock_ended_callback`` parameter. Normally, ``callback`` will be
called when the event is processed. But, if the clock is stopped before it can
be processed, if the application exited normally (and the app was started) and
the event wasn't canceled, and the callbacks are not garbage collected, then
``clock_ended_callback`` will be called instead when the clock is stopped.

That is, given these conditions, if :class:`ClockNotRunningError` was not
raised when the event was scheduled, then one of these callbacks will be
called - either ``callback`` if the event executed normally, or
``clock_ended_callback`` if the clock is stopped while the event is scheduled.

By default, events can be scheduled before the clock is started because it is
assumed the clock will eventually be started when the app starts. I.e.
calling :meth:`CyClockBase.create_lifecycle_aware_trigger` before the clock
and application starts will succeed. But if the app never actually starts, then
neither of the callbacks may be executed.

.. versionadded:: 2.0.0
    The lifecycle was added in 2.0.0

Exception Handling
------------------

Kivy provides a exception handling manager,
:attr:`~kivy.base.ExceptionManager`, to handle its internal exceptions
including exceptions raised by clock callbacks, without crashing the
application. By default when an exception is raised, the app will crash.
But, if a handler is registered with the exception manager and the handler
handles the exception, the app will not crash and will continue as normal.::

    from kivy.base import ExceptionHandler, ExceptionManager
    class MyHandler(ExceptionHandler):
        def handle_exception(self, inst):
            if isinstance(inst, ValueError):
                Logger.exception('ValueError caught by MyHandler')
                return ExceptionManager.PASS
            return ExceptionManager.RAISE

    ExceptionManager.add_handler(MyHandler())

Then, all ValueError exceptions will be logged to the console and ignored.
Similarly, if a scheduled clock callback raises a ValueError, other clock
events will still be processed normally.

If an event's callback raises an exception, before the exception handler is
executed, the callback is immediately canceled.

It still is possible for the app to be corrupted if kivy itself is the source
of the exception. I.e. even with a handler that ignores exceptions and doesn't
crash, the app may be in a corrupted state if the error originates from within
Kivy itself. However, the exception handler can help protect the app from
crashing and can help protect against user callbacks crashing the app.

.. versionchanged:: 2.0.0
    Prior to Kivy 2.0.0, an exception raised in a event's callback would
    cause the clock to crash and subsequent events may or may not be executed.
    Even if the exception was handled by an
    :class:`~kivy.base.ExceptionHandler`, there was no guarantee that some
    scheduled events would not be skipped.

    From 2.0.0 onward, if a event's exception is handled by an
    :class:`~kivy.base.ExceptionHandler`, other events will be shielded from
    the exception and will execute normally.

Scheduling from ``__del__``
---------------------------

It is not safe to schedule Clock events from a object's ``__del__`` or
``__dealloc__`` method. If you must schedule a Clock call from this method, use
:meth:`CyClockBase.schedule_del_safe` or
:meth:`CyClockBase.schedule_lifecycle_aware_del_safe` instead.

Threading and Callback Order
-----------------------------

Beginning with 1.10.0, all the events scheduled for the same frame, e.g.
all the events scheduled in the same frame with a ``timeout`` of ``0``,
well be executed in the order they were scheduled.

Also, all the scheduling and canceling methods are fully thread safe and
can be safely used from external threads.

As a a consequence, calling :meth:`CyClockBase.unschedule` with the original
callback is now significantly slower and highly discouraged. Instead, the
returned events should be used to cancel. As a tradeoff, all the other methods
are now significantly faster than before.

Advanced Clock Details
-----------------------

The following section goes into the internal kivy clock details as well
as the various clock options. It is meant only for advanced users.

Fundamentally, the Kivy clock attempts to execute any scheduled callback
rhythmically as determined by the specified fps (frame per second, see
``maxfps`` in :mod:`~kivy.config`). That is, ideally, given e.g. a desired fps
of 30, the clock will execute the callbacks at intervals of 1 / 30 seconds, or
every 33.33 ms. All the callbacks in a frame are given the same timestamp,
i.e. the ``dt`` passed to the callback are all the same and it's the difference
in time between the start of this and the previous frame.

Because of inherent indeterminism, the frames do not actually occur exactly
at intervals of the fps and ``dt`` may be under or over the desired fps.
Also, once the timeout is "close enough" to the desired timeout, as determined
internally, Kivy will execute the callback in the current frame even when the
"actual time" has not elapsed the ``timeout`` amount.

Kivy offers now, since ``1.10.0``, multiple clocks with different behaviors.

Default Clock
^^^^^^^^^^^^^^

The default clock (``default``) behaves as described above. When a callback
with a timeout of zero or non-zero is scheduled, they are executed at the frame
that is near the timeout, which is a function of the fps. So a timeout of zero
would still result in a delay of one frame or about 1 / fps, typically a bit
less but sometimes more depending on the CPU usage of the other events
scheduled for that frame.

In a test using a fps of 30, a callback with a timeout of 0, 0.001, and 0.05,
resulted in a mean callback delay of 0.02487, 0.02488, and 0.05011 seconds,
respectively. When tested with a fps of 600 the delay for 0.05 was similar,
except the standard deviation was reduced resulting in overall better accuracy.

Interruptible Clock
^^^^^^^^^^^^^^^^^^^^

The default clock suffers from the quantization problem, as frames occur only
on intervals and any scheduled timeouts will not be able to occur during an
interval. For example, with the timeout of 0.05, while the mean was 0.05011,
its values ranged between 0.02548 - 0.07348 and a standard deviation of 0.002.
Also, there's the minimum timeout of about 0.02487.

The interruptible clock (``interrupt``) will execute timeouts even during a
frame. So a timeout of zero will execute as quickly as possible and similarly
a non-zero timeout will be executed even during the interval.

This clock, and all the clocks described after this have an option,
:attr:`ClockBaseInterruptBehavior.interupt_next_only`. When True, any of the
behavior new behavior will only apply to the callbacks with a timeout of
zero. Non-zero timeouts will behave like in the default clock. E.g. for this
clock when True, only zero timeouts will execute during the the interval.

In a test using a fps of 30, a callback with a timeout of 0, 0.001, and 0.05,
resulted in a mean callback delay of 0.00013, 0.00013, and 0.04120 seconds,
respectively when :attr:`ClockBaseInterruptBehavior.interupt_next_only` was
False. Also, compared to the default clock the standard deviation was reduced.
When :attr:`ClockBaseInterruptBehavior.interupt_next_only` was True, the values
were 0.00010, 0.02414, and 0.05034, respectively.

Free Clock
^^^^^^^^^^^

The interruptible clock may not be ideal for all cases because all the events
are executed during the intervals and events are not executed anymore
rhythmically as multiples of the fps. For example, there may not be any benefit
for the graphics to update in a sub-interval, so the additional accuracy
wastes CPU.

The Free clock (``free_all``) solves this by having ``Clock.xxx_free`` versions
of all the Clock scheduling methods. By free, we mean the event is free from
the fps because it's not fps limited. E.g.
:meth:`CyClockBaseFree.create_trigger_free` corresponds to
:meth:`CyClockBase.create_trigger`. Only when an event scheduled using the
``Clock.xxx_free`` methods is present will the clock interrupt and execute
the events during the interval. So, if no ``free`` event is present the clock
behaves like the ``default`` clock, otherwise it behaves like the ``interrupt``
clock.

In a test using a fps of 30, a callback with a timeout of 0s, 0.001s, and
0.05s, resulted in a mean callback delay of 0.00012s, 0.00017s, and 0.04121s
seconds, respectively when it was a free event and 0.02403s, 0.02405s, and
0.04829s, respectively when it wasn't.

Free Only Clock
^^^^^^^^^^^^^^^^^

The Free clock executes all events when a free event was scheduled. This
results in normal events also being execute in the middle of the interval
when a free event is scheduled. For example, above, when a free event was
absent, a normal event with a 0.001s timeout was delayed for 0.02405s. However,
if a free event happened to be also scheduled, the normal event was only
delayed 0.00014s, which may be undesirable.

The Free only clock (``free_only``) solves it by only executing free events
during the interval and normal events are always executed like with the
default clock. For example, in the presence of a free event, a normal event
with a timeout of 0.001s still had a delay of 0.02406. So this clock,
treats free and normal events independently, with normal events always being
fps limited, but never the free events.

Summary
^^^^^^^^

The kivy clock type to use can be set with the ``kivy_clock`` option the
:mod:`~kivy.config`. If ``KIVY_CLOCK`` is present in the environment it
overwrites the config selection. Its possible values are as follows:

* When ``kivy_clock`` is ``default``, the normal clock, :class:`ClockBase`,
  which limits callbacks to the maxfps quantization - is used.
* When ``kivy_clock`` is ``interrupt``, a interruptible clock,
  :class:`ClockBaseInterrupt`, which doesn't limit any callbacks to the
  maxfps - is used. Callbacks will be executed at any time.
* When ``kivy_clock`` is ``free_all``, a interruptible clock,
  :class:`ClockBaseFreeInterruptAll`, which doesn't limit any callbacks to the
  maxfps in the presence of free events, but in their absence it limits events
  to the fps quantization interval - is used.
* When ``kivy_clock`` is ``free_only``, a interruptible clock,
  :class:`ClockBaseFreeInterruptAll`, which treats free and normal events
  independently; normal events are fps limited while free events are not - is
  used.

Async clock support
-------------------

.. versionadded:: 2.0.0

Experimental async support has been added in 2.0.0. The Clock now has a
:meth:`ClockBaseBehavior.async_tick` and :meth:`ClockBaseBehavior.async_idle`
coroutine method which is used by the kivy EventLoop when the kivy EventLoop is
executed in a asynchronous manner. When used, the kivy clock does not
block while idling.

The async library to use is selected with the `KIVY_EVENTLOOP` environmental
variable or by  calling :meth:`~kivy.clock.ClockBaseBehavior.init_async_lib`
directly. The library can be one of `"asyncio"` when the standard library
`asyncio` should be used, or `"trio"` if the trio library
should be used. If not set it defaults to `"asyncio"`.

See :mod:`~kivy.app` for example usage.
'''

__all__ = (
    'Clock', 'ClockNotRunningError', 'ClockEvent', 'FreeClockEvent',
    'CyClockBase', 'CyClockBaseFree',
    'ClockBaseBehavior', 'ClockBaseInterruptBehavior',
    'ClockBaseInterruptFreeBehavior', 'ClockBase', 'ClockBaseInterrupt',
    'ClockBaseFreeInterruptAll', 'ClockBaseFreeInterruptOnly', 'mainthread')

from sys import platform
from os import environ
from functools import wraps, partial
from kivy.context import register_context
from kivy.config import Config
from kivy.logger import Logger
from kivy.compat import clock as _default_time
import time
try:
    from kivy._clock import CyClockBase, ClockEvent, FreeClockEvent, \
        CyClockBaseFree, ClockNotRunningError
except ImportError:
    Logger.error(
        'Clock: Unable to import kivy._clock. Have you perhaps forgotten to '
        'compile kivy? Kivy contains Cython code which needs to be compiled. '
        'A missing kivy._clock often indicates the Cython code has not been '
        'compiled. Please follow the installation instructions and make sure '
        'to compile Kivy')
    raise

from threading import Event as ThreadingEvent

# some reading: http://gameprogrammingpatterns.com/game-loop.html


def _get_sleep_obj():
    pass


try:
    import ctypes
    if platform in ('win32', 'cygwin'):
        # Win32 Sleep function is only 10-millisecond resolution, so
        # instead use a waitable timer object, which has up to
        # 100-nanosecond resolution (hardware and implementation
        # dependent, of course).

        _kernel32 = ctypes.windll.kernel32

        def _get_sleep_obj():  # noqa: F811
            return _kernel32.CreateWaitableTimerA(None, True, None)

        def _usleep(microseconds, obj=None):
            delay = ctypes.c_longlong(int(-microseconds * 10))
            _kernel32.SetWaitableTimer(
                obj, ctypes.byref(delay), 0,
                ctypes.c_void_p(), ctypes.c_void_p(), False)
            _kernel32.WaitForSingleObject(obj, 0xffffffff)
    else:
        if platform == 'darwin':
            _libc = ctypes.CDLL('libc.dylib')
        else:
            from ctypes.util import find_library
            _libc = ctypes.CDLL(find_library('c'), use_errno=True)

            def _libc_clock_gettime_wrapper():
                from os import strerror

                class struct_tv(ctypes.Structure):
                    _fields_ = [('tv_sec', ctypes.c_long),
                                ('tv_usec', ctypes.c_long)]

                _clock_gettime = _libc.clock_gettime
                _clock_gettime.argtypes = [ctypes.c_long,
                                           ctypes.POINTER(struct_tv)]

                if 'linux' in platform:
                    _clockid = 4  # CLOCK_MONOTONIC_RAW (Linux specific)
                elif 'freebsd' in platform:
                    # clockid constants from sys/time.h
                    # _clockid = 4 # CLOCK_MONOTONIC (FreeBSD specific)
                    # 11: CLOCK_MONOTONIC_PRECISE (FreeBSD known OK for 10.2)
                    _clockid = 11
                    # _clockid = 12
                    # 12: CLOCK_MONOTONIC_FAST (FreeBSD specific)
                    Logger.debug('clock.py: {{{:s}}} clock ID {:d}'.format(
                        platform, _clockid))
                elif 'openbsd' in platform:
                    _clockid = 3  # CLOCK_MONOTONIC
                else:
                    _clockid = 1  # CLOCK_MONOTONIC

                tv = struct_tv()

                def _time():
                    if _clock_gettime(ctypes.c_long(_clockid),
                                      ctypes.pointer(tv)) != 0:
                        _ernno = ctypes.get_errno()
                        raise OSError(_ernno, strerror(_ernno))
                    return tv.tv_sec + (tv.tv_usec * 0.000000001)

                return _time

            _default_time = _libc_clock_gettime_wrapper()  # noqa: F811

        _libc.usleep.argtypes = [ctypes.c_ulong]
        _libc_usleep = _libc.usleep

        def _usleep(microseconds, obj=None):
            _libc_usleep(int(microseconds))

except (OSError, ImportError, AttributeError):
    # ImportError: ctypes is not available on python-for-android.
    # AttributeError: ctypes is now available on python-for-android, but
    #   "undefined symbol: clock_gettime". CF #3797
    # OSError: if the libc cannot be readed (like with buildbot: invalid ELF
    # header)

    def _usleep(microseconds, obj=None):
        time.sleep(microseconds / 1000000.)


[docs]class ClockBaseBehavior(object): '''The base of the kivy clock. :parameters: `async_lib`: string The async library to use when the clock is run asynchronously. Can be one of, `"asyncio"` when the standard library asyncio should be used, or `"trio"` if the trio library should be used. It defaults to `'asyncio'` or the value in the environmental variable `KIVY_EVENTLOOP` if set. :meth:`init_async_lib` can also be called directly to set the library. ''' _dt = 0.0001 _last_fps_tick = None _start_tick = 0 _fps = 0 _rfps = 0 _fps_counter = 0 _rfps_counter = 0 _frames = 0 _frames_displayed = 0 _events_duration = 0 '''The measured time that it takes to process all the events etc, excepting any sleep or waiting time. It is the average and is updated every 5 seconds. ''' _duration_count = 0 _sleep_time = 0 _duration_ts0 = 0 MIN_SLEEP = 0.005 '''The minimum time to sleep. If the remaining time is less than this, the event loop will continue. ''' SLEEP_UNDERSHOOT = MIN_SLEEP - 0.001 _async_lib = None _async_wait_for = None def __init__(self, async_lib='asyncio', **kwargs): self.init_async_lib(async_lib) super(ClockBaseBehavior, self).__init__(**kwargs) self._duration_ts0 = self._start_tick = self._last_tick = self.time() self._max_fps = float(Config.getint('graphics', 'maxfps'))
[docs] def init_async_lib(self, lib): """Manually sets the async library to use internally, when running in a asynchronous manner. This can be called anytime before the kivy event loop has started, but not once the kivy App is running. :parameters: `lib`: string The async library to use when the clock is run asynchronously. Can be one of, `"asyncio"` when the standard library asyncio should be used, or `"trio"` if the trio library should be used. """ if lib == 'trio': import trio self._async_lib = trio async def wait_for(coro, t): with trio.move_on_after(t): await coro self._async_wait_for = wait_for elif lib == 'asyncio': import asyncio self._async_lib = asyncio self._async_wait_for = asyncio.wait_for else: raise ValueError('async library {} not recognized'.format(lib))
@property def frametime(self): '''Time spent between the last frame and the current frame (in seconds). .. versionadded:: 1.8.0 ''' return self._dt @property def frames(self): '''Number of internal frames (not necessarily drawed) from the start of the clock. .. versionadded:: 1.8.0 ''' return self._frames @property def frames_displayed(self): '''Number of displayed frames from the start of the clock. ''' return self._frames_displayed
[docs] def usleep(self, microseconds): '''Sleeps for the number of microseconds. ''' pass
[docs] def idle(self): '''(internal) waits here until the next frame. ''' fps = self._max_fps if fps > 0: min_sleep = self.get_resolution() undershoot = 4 / 5. * min_sleep usleep = self.usleep ready = self._check_ready done, sleeptime = ready(fps, min_sleep, undershoot) while not done: usleep(1000000 * sleeptime) done, sleeptime = ready(fps, min_sleep, undershoot) current = self.time() self._dt = current - self._last_tick self._last_tick = current return current
[docs] async def async_idle(self): '''(internal) async version of :meth:`idle`. ''' fps = self._max_fps if fps > 0: min_sleep = self.get_resolution() undershoot = 4 / 5. * min_sleep ready = self._check_ready slept = False done, sleeptime = ready(fps, min_sleep, undershoot) while not done: slept = True await self._async_lib.sleep(sleeptime) done, sleeptime = ready(fps, min_sleep, undershoot) if not slept: await self._async_lib.sleep(0) else: await self._async_lib.sleep(0) current = self.time() self._dt = current - self._last_tick self._last_tick = current return current
def _check_ready(self, fps, min_sleep, undershoot): sleeptime = 1 / fps - (self.time() - self._last_tick) return sleeptime - undershoot <= min_sleep, sleeptime - undershoot
[docs] def tick(self): '''Advance the clock to the next step. Must be called every frame. The default clock has a tick() function called by the core Kivy framework.''' self.pre_idle() ts = self.time() self.post_idle(ts, self.idle())
[docs] async def async_tick(self): '''async version of :meth:`tick`. ''' self.pre_idle() ts = self.time() current = await self.async_idle() self.post_idle(ts, current)
[docs] def pre_idle(self): '''Called before :meth:`idle` by :meth:`tick`. ''' self._release_references()
[docs] def post_idle(self, ts, current): '''Called after :meth:`idle` by :meth:`tick`. ''' # tick the current time self._frames += 1 self._fps_counter += 1 # compute how long the event processing takes self._duration_count += 1 self._sleep_time += current - ts t_tot = current - self._duration_ts0 if t_tot >= 1.: self._events_duration = \ (t_tot - self._sleep_time) / float(self._duration_count) self._duration_ts0 = current self._sleep_time = self._duration_count = 0 # calculate fps things if self._last_fps_tick is None: self._last_fps_tick = current elif current - self._last_fps_tick > 1: d = float(current - self._last_fps_tick) self._fps = self._fps_counter / d self._rfps = self._rfps_counter self._last_fps_tick = current self._fps_counter = 0 self._rfps_counter = 0 # process event self._process_events() return self._dt
[docs] def tick_draw(self): '''Tick the drawing counter. ''' self._process_events_before_frame() self._rfps_counter += 1 self._frames_displayed += 1
[docs] def get_fps(self): '''Get the current average FPS calculated by the clock. ''' return self._fps
[docs] def get_rfps(self): '''Get the current "real" FPS calculated by the clock. This counter reflects the real framerate displayed on the screen. In contrast to get_fps(), this function returns a counter of the number of frames, not the average of frames per second. ''' return self._rfps
[docs] def get_time(self): '''Get the last tick made by the clock.''' return self._last_tick
[docs] def get_boottime(self): '''Get the time in seconds from the application start.''' return self._last_tick - self._start_tick
time = staticmethod(partial(_default_time)) def handle_exception(self, e): from kivy.base import ExceptionManager if ExceptionManager.handle_exception(e) == ExceptionManager.RAISE: raise
ClockBaseBehavior.time.__doc__ = \ '''Proxy method for :func:`~kivy.compat.clock`. '''
[docs]class ClockBaseInterruptBehavior(ClockBaseBehavior): '''A kivy clock which can be interrupted during a frame to execute events. ''' interupt_next_only = False _event = None _async_event = None _get_min_timeout_func = None def __init__(self, interupt_next_only=False, **kwargs): super(ClockBaseInterruptBehavior, self).__init__(**kwargs) self._event = ThreadingEvent() self.interupt_next_only = interupt_next_only self._get_min_timeout_func = self.get_min_timeout
[docs] def init_async_lib(self, lib): super(ClockBaseInterruptBehavior, self).init_async_lib(lib) if lib == 'trio': import trio self._async_event = trio.Event() # we don't know if this is called after things have already been # scheduled, so don't delay for a full frame before processing # events self._async_event.set() elif lib == 'asyncio': import asyncio self._async_event = asyncio.Event() self._async_event.set()
[docs] def usleep(self, microseconds): self._event.clear() self._event.wait(microseconds / 1000000.)
async def async_usleep(self, microseconds): self._async_event.clear() await self._async_wait_for( self._async_event.wait(), microseconds / 1000000.) def on_schedule(self, event): fps = self._max_fps if not fps: return if not event.timeout or ( not self.interupt_next_only and event.timeout <= 1 / fps - # remaining time (self.time() - self._last_tick) + # elapsed time 4 / 5. * self.get_resolution()): # resolution fudge factor self._event.set() if self._async_event: self._async_event.set()
[docs] def idle(self): fps = self._max_fps event = self._event resolution = self.get_resolution() if fps > 0: done, sleeptime = self._check_ready( fps, resolution, 4 / 5. * resolution, event) if not done: event.wait(sleeptime) current = self.time() self._dt = current - self._last_tick self._last_tick = current event.clear() # anything scheduled from now on, if scheduled for the upcoming frame # will cause a timeout of the event on the next idle due to on_schedule # `self._last_tick = current` must happen before clear, otherwise the # on_schedule computation is wrong when exec between the clear and # the `self._last_tick = current` bytecode. return current
[docs] async def async_idle(self): fps = self._max_fps event = self._async_event resolution = self.get_resolution() if fps > 0: done, sleeptime = self._check_ready( fps, resolution, 4 / 5. * resolution, event) if not done: await self._async_wait_for(event.wait(), sleeptime) else: await self._async_lib.sleep(0) else: await self._async_lib.sleep(0) current = self.time() self._dt = current - self._last_tick self._last_tick = current event.clear() # anything scheduled from now on, if scheduled for the upcoming frame # will cause a timeout of the event on the next idle due to on_schedule # `self._last_tick = current` must happen before clear, otherwise the # on_schedule computation is wrong when exec between the clear and # the `self._last_tick = current` bytecode. return current
def _check_ready(self, fps, min_sleep, undershoot, event): if event.is_set(): return True, 0 t = self._get_min_timeout_func() if not t: return True, 0 if not self.interupt_next_only: curr_t = self.time() sleeptime = min(1 / fps - (curr_t - self._last_tick), t - curr_t) else: sleeptime = 1 / fps - (self.time() - self._last_tick) return sleeptime - undershoot <= min_sleep, sleeptime - undershoot
[docs]class ClockBaseInterruptFreeBehavior(ClockBaseInterruptBehavior): '''A base class for the clock that interrupts the sleep interval for free events. ''' def __init__(self, **kwargs): super(ClockBaseInterruptFreeBehavior, self).__init__(**kwargs) self._get_min_timeout_func = self.get_min_free_timeout def on_schedule(self, event): if not event.free: # only wake up for free events return # free events should use real time not frame time event._last_dt = self.time() return super(ClockBaseInterruptFreeBehavior, self).on_schedule(event)
[docs]class ClockBase(ClockBaseBehavior, CyClockBase): '''The ``default`` kivy clock. See module for details. ''' _sleep_obj = None def __init__(self, **kwargs): super(ClockBase, self).__init__(**kwargs) self._sleep_obj = _get_sleep_obj()
[docs] def usleep(self, microseconds): _usleep(microseconds, self._sleep_obj)
[docs]class ClockBaseInterrupt(ClockBaseInterruptBehavior, CyClockBase): '''The ``interrupt`` kivy clock. See module for details. ''' pass
[docs]class ClockBaseFreeInterruptAll( ClockBaseInterruptFreeBehavior, CyClockBaseFree): '''The ``free_all`` kivy clock. See module for details. ''' pass
[docs]class ClockBaseFreeInterruptOnly( ClockBaseInterruptFreeBehavior, CyClockBaseFree): '''The ``free_only`` kivy clock. See module for details. '''
[docs] def idle(self): fps = self._max_fps current = self.time() event = self._event if fps > 0: min_sleep = self.get_resolution() usleep = self.usleep undershoot = 4 / 5. * min_sleep min_t = self.get_min_free_timeout interupt_next_only = self.interupt_next_only sleeptime = 1 / fps - (current - self._last_tick) while sleeptime - undershoot > min_sleep: if event.is_set(): do_free = True else: t = min_t() if not t: do_free = True elif interupt_next_only: do_free = False else: sleeptime = min(sleeptime, t - current) do_free = sleeptime - undershoot <= min_sleep if do_free: event.clear() self._process_free_events(current) else: event.wait(sleeptime - undershoot) current = self.time() sleeptime = 1 / fps - (current - self._last_tick) self._dt = current - self._last_tick self._last_tick = current event.clear() # this needs to stay after _last_tick return current
[docs] async def async_idle(self): fps = self._max_fps current = self.time() event = self._async_event if fps > 0: min_sleep = self.get_resolution() usleep = self.usleep undershoot = 4 / 5. * min_sleep min_t = self.get_min_free_timeout interupt_next_only = self.interupt_next_only sleeptime = 1 / fps - (current - self._last_tick) slept = False while sleeptime - undershoot > min_sleep: if event.is_set(): do_free = True else: t = min_t() if not t: do_free = True elif interupt_next_only: do_free = False else: sleeptime = min(sleeptime, t - current) do_free = sleeptime - undershoot <= min_sleep if do_free: event.clear() self._process_free_events(current) else: slept = True await self._async_wait_for( event.wait(), sleeptime - undershoot) current = self.time() sleeptime = 1 / fps - (current - self._last_tick) if not slept: await self._async_lib.sleep(0) else: await self._async_lib.sleep(0) self._dt = current - self._last_tick self._last_tick = current event.clear() # this needs to stay after _last_tick return current
[docs]def mainthread(func): '''Decorator that will schedule the call of the function for the next available frame in the mainthread. It can be useful when you use :class:`~kivy.network.urlrequest.UrlRequest` or when you do Thread programming: you cannot do any OpenGL-related work in a thread. Please note that this method will return directly and no result can be returned:: @mainthread def callback(self, *args): print('The request succeeded!', 'This callback is called in the main thread.') self.req = UrlRequest(url='http://...', on_success=callback) .. versionadded:: 1.8.0 ''' @wraps(func) def delayed_func(*args, **kwargs): def callback_func(dt): func(*args, **kwargs) Clock.schedule_once(callback_func, 0) return delayed_func
def triggered(timeout=0, interval=False): '''Decorator that will trigger the call of the function at the specified timeout, through the method :meth:`CyClockBase.create_trigger`. Subsequent calls to the decorated function (while the timeout is active) are ignored. It can be helpful when an expensive funcion (i.e. call to a server) can be triggered by different methods. Setting a proper timeout will delay the calling and only one of them wil be triggered. @triggered(timeout, interval=False) def callback(id): print('The callback has been called with id=%d' % id) >> callback(id=1) >> callback(id=2) The callback has been called with id=2 The decorated callback can also be unscheduled using: >> callback.cancel() .. versionadded:: 1.10.1 ''' def wrapper_triggered(func): _args = [] _kwargs = {} def cb_function(dt): func(*tuple(_args), **_kwargs) cb_trigger = Clock.create_trigger( cb_function, timeout=timeout, interval=interval) @wraps(func) def trigger_function(*args, **kwargs): _args[:] = [] _args.extend(list(args)) _kwargs.clear() _kwargs.update(kwargs) cb_trigger() def trigger_cancel(): cb_trigger.cancel() setattr(trigger_function, 'cancel', trigger_cancel) return trigger_function return wrapper_triggered if 'KIVY_DOC_INCLUDE' in environ: #: Instance of :class:`ClockBaseBehavior`. Clock: ClockBase = None else: _classes = {'default': ClockBase, 'interrupt': ClockBaseInterrupt, 'free_all': ClockBaseFreeInterruptAll, 'free_only': ClockBaseFreeInterruptOnly} _clk = environ.get('KIVY_CLOCK', Config.get('kivy', 'kivy_clock')) if _clk not in _classes: raise Exception( '{} is not a valid kivy clock. Valid clocks are {}'.format( _clk, sorted(_classes.keys()))) Clock: ClockBase = register_context( 'Clock', _classes[_clk], async_lib=environ.get('KIVY_EVENTLOOP', 'asyncio')) '''The kivy Clock instance. See module documentation for details. '''