Version

Quick search

Table Of Contents

Source code for kivy.input.recorder

'''
Input recorder
==============

.. versionadded:: 1.1.0

.. warning::

    This part of Kivy is still experimental and this API is subject to
    change in a future version.

This is a class that can record and replay some input events. This can
be used for test cases, screen savers etc.

Once activated, the recorder will listen for any input event and save its
properties in a file with the delta time. Later, you can play the input
file: it will generate fake touch events with the saved properties and
dispatch it to the event loop.

By default, only the position is saved ('pos' profile and 'sx', 'sy',
attributes). Change it only if you understand how input handling works.

Recording events
----------------

The best way is to use the "recorder" module. Check the :doc:`api-kivy.modules`
documentation to see how to activate a module.

Once activated, you can press F8 to start the recording. By default,
events will be written to `<currentpath>/recorder.kvi`. When you want to
stop recording, press F8 again.

You can replay the file by pressing F7.

Check the :doc:`api-kivy.modules.recorder` module for more information.

Manual play
-----------

You can manually open a recorder file, and play it by doing::

    from kivy.input.recorder import Recorder

    rec = Recorder(filename='myrecorder.kvi')
    rec.play = True

If you want to loop over that file, you can do::


    from kivy.input.recorder import Recorder

    def recorder_loop(instance, value):
        if value is False:
            instance.play = True

    rec = Recorder(filename='myrecorder.kvi')
    rec.bind(play=recorder_loop)
    rec.play = True

Recording more attributes
-------------------------

You can extend the attributes to save on one condition: attributes values must
be simple values, not instances of complex classes.

Let's say you want to save the angle and pressure of the touch, if available::

    from kivy.input.recorder import Recorder

    rec = Recorder(filename='myrecorder.kvi',
        record_attrs=['is_touch', 'sx', 'sy', 'angle', 'pressure'],
        record_profile_mask=['pos', 'angle', 'pressure'])
    rec.record = True

Or with modules variables::

    $ python main.py -m recorder,attrs=is_touch:sx:sy:angle:pressure, \
            profile_mask=pos:angle:pressure

Known limitations
-----------------

  - Unable to save attributes with instances of complex classes.
  - Values that represent time will not be adjusted.
  - Can replay only complete records. If a begin/update/end event is missing,
    this could lead to ghost touches.
  - Stopping the replay before the end can lead to ghost touches.

'''

__all__ = ('Recorder', )

from os.path import exists
from time import time
from kivy.event import EventDispatcher
from kivy.properties import ObjectProperty, BooleanProperty, StringProperty, \
    NumericProperty, ListProperty
from kivy.input.motionevent import MotionEvent
from kivy.base import EventLoop
from kivy.logger import Logger
from ast import literal_eval
from functools import partial


class RecorderMotionEvent(MotionEvent):

    def depack(self, args):
        for key, value in list(args.items()):
            setattr(self, key, value)
        super(RecorderMotionEvent, self).depack(args)


[docs]class Recorder(EventDispatcher): '''Recorder class. Please check module documentation for more information. :Events: `on_stop`: Fired when the playing stops. .. versionchanged:: 1.10.0 Event `on_stop` added. ''' window = ObjectProperty(None) '''Window instance to attach the recorder. If None, it will use the default instance. :attr:`window` is a :class:`~kivy.properties.ObjectProperty` and defaults to None. ''' counter = NumericProperty(0) '''Number of events recorded in the last session. :attr:`counter` is a :class:`~kivy.properties.NumericProperty` and defaults to 0, read-only. ''' play = BooleanProperty(False) '''Boolean to start/stop the replay of the current file (if it exists). :attr:`play` is a :class:`~kivy.properties.BooleanProperty` and defaults to False. ''' record = BooleanProperty(False) '''Boolean to start/stop the recording of input events. :attr:`record` is a :class:`~kivy.properties.BooleanProperty` and defaults to False. ''' filename = StringProperty('recorder.kvi') '''Filename to save the output of the recorder. :attr:`filename` is a :class:`~kivy.properties.StringProperty` and defaults to 'recorder.kvi'. ''' record_attrs = ListProperty(['is_touch', 'sx', 'sy']) '''Attributes to record from the motion event. :attr:`record_attrs` is a :class:`~kivy.properties.ListProperty` and defaults to ['is_touch', 'sx', 'sy']. ''' record_profile_mask = ListProperty(['pos']) '''Profile to save in the fake motion event when replayed. :attr:`record_profile_mask` is a :class:`~kivy.properties.ListProperty` and defaults to ['pos']. ''' # internals record_fd = ObjectProperty(None) record_time = NumericProperty(0.) __events__ = ('on_stop',) def __init__(self, **kwargs): super(Recorder, self).__init__(**kwargs) if self.window is None: # manually set the current window from kivy.core.window import Window self.window = Window self.window.bind( on_motion=self.on_motion, on_key_up=partial(self.on_keyboard, 'keyup'), on_key_down=partial(self.on_keyboard, 'keydown'), on_keyboard=partial(self.on_keyboard, 'keyboard')) def on_motion(self, window, etype, motionevent): if not self.record: return args = dict((arg, getattr(motionevent, arg)) for arg in self.record_attrs if hasattr(motionevent, arg)) args['profile'] = [x for x in motionevent.profile if x in self.record_profile_mask] self.record_fd.write('%r\n' % ( (time() - self.record_time, etype, motionevent.uid, args), )) self.counter += 1 def on_keyboard(self, etype, window, key, *args, **kwargs): if not self.record: return self.record_fd.write('%r\n' % ( (time() - self.record_time, etype, 0, { 'key': key, 'scancode': kwargs.get('scancode'), 'codepoint': kwargs.get('codepoint', kwargs.get('unicode')), 'modifier': kwargs.get('modifier'), 'is_touch': False}), )) self.counter += 1 def release(self): self.window.unbind( on_motion=self.on_motion, on_key_up=self.on_keyboard, on_key_down=self.on_keyboard) def on_record(self, instance, value): if value: # generate a record filename self.counter = 0 self.record_time = time() self.record_fd = open(self.filename, 'w') self.record_fd.write('#RECORDER1.0\n') Logger.info('Recorder: Recording inputs to %r' % self.filename) else: self.record_fd.close() Logger.info('Recorder: Recorded %d events in %r' % (self.counter, self.filename)) # needed for acting as an input provider def stop(self): pass def start(self): pass def on_play(self, instance, value): if not value: Logger.info('Recorder: Stop playing %r' % self.filename) EventLoop.remove_input_provider(self) return if not exists(self.filename): Logger.error('Recorder: Unable to find %r file, play aborted.' % ( self.filename)) return with open(self.filename, 'r') as fd: data = fd.read().splitlines() if len(data) < 2: Logger.error('Recorder: Unable to play %r, file truncated.' % ( self.filename)) return if data[0] != '#RECORDER1.0': Logger.error('Recorder: Unable to play %r, invalid header.' % ( self.filename)) return # decompile data self.play_data = [literal_eval(x) for x in data[1:]] self.play_time = time() self.play_me = {} Logger.info('Recorder: Start playing %d events from %r' % (len(self.play_data), self.filename)) EventLoop.add_input_provider(self) def on_stop(self): pass def update(self, dispatch_fn): if not self.play_data: Logger.info('Recorder: Playing finished.') self.play = False self.dispatch('on_stop') dt = time() - self.play_time while self.play_data: event = self.play_data[0] assert len(event) == 4 if event[0] > dt: return me = None etype, uid, args = event[1:] if etype == 'begin': me = RecorderMotionEvent('recorder', uid, args) self.play_me[uid] = me elif etype == 'update': me = self.play_me[uid] me.depack(args) elif etype == 'end': me = self.play_me.pop(uid) me.depack(args) elif etype == 'keydown': self.window.dispatch( 'on_key_down', args['key'], args['scancode'], args['codepoint'], args['modifier']) elif etype == 'keyup': self.window.dispatch( 'on_key_up', args['key'], args['scancode'], args['codepoint'], args['modifier']) elif etype == 'keyboard': self.window.dispatch( 'on_keyboard', args['key'], args['scancode'], args['codepoint'], args['modifier']) if me: dispatch_fn(etype, me) self.play_data.pop(0)
def start(win, ctx): ctx.recorder = Recorder(window=win) def stop(win, ctx): if hasattr(ctx, 'recorder'): ctx.recorder.release()