Quick search

Table Of Contents

Source code for kivy.lib.osc.OSC

# Open SoundControl for Python
# Copyright (C) 2002 Daniel Holth, Clinton McChesney
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
# For questions regarding this module contact
# Daniel Holth <dholth@stetson.edu> or visit
# http://www.stetson.edu/~ProctoLogic/
# Changelog:
# 15 Nov. 2001:
#   Removed dependency on Python 2.0 features.
#   - dwh
# 13 Feb. 2002:
#   Added a generic callback handler.
#   - dwh

import struct
import math
import sys
import string
import pprint
from kivy.compat import string_types

class impulse(object):
    def __nonzero__(self):
        return True
    def __str__(self):
        return "Impulse"
    def __repr__(self):
        return "Impulse"

class null(object):
    def __nonzero__(self):
        return False
    def __str__(self):
        return "NULL"
    def __repr__(self):
        return "NULL"

[docs]def hexDump(data): """Useful utility; prints the string in hexadecimal""" for i in range(len(data)): sys.stdout.write("%2x " % (ord(data[i]))) if (i+1) % 8 == 0: print(repr(data[i-7:i+1])) if(len(data) % 8 != 0): print(str.rjust("", 11), repr(data[i-len(data) % 8:i + 1]))
[docs]class OSCMessage: """Builds typetagged OSC messages.""" def __init__(self): self.address = "" self.clearData() def setAddress(self, address): self.address = address def setMessage(self, message): self.message = message def setTypetags(self, typetags): self.typetags = typetags def clear(self): self.address = "" self.clearData() def clearData(self): self.typetags = b"," self.message = bytes()
[docs] def append(self, argument, typehint=None): """Appends data to the message, updating the typetags based on the argument's type. If the argument is a blob (counted string) pass in 'b' as typehint.""" if typehint == 'b': binary = OSCBlob(argument) else: binary = OSCArgument(argument) self.typetags = self.typetags + binary[0] self.rawAppend(binary[1])
[docs] def rawAppend(self, data): """Appends raw data to the message. Use append().""" self.message = self.message + data
[docs] def getBinary(self): """Returns the binary message (so far) with typetags.""" address = OSCArgument(self.address)[1] typetags = OSCArgument(self.typetags)[1] return address + typetags + self.message
def __repr__(self): return self.getBinary()
def readTrue(data): return (True, data) def readFalse(data): return (False, data) def readImpulse(data): return (impulse(), data) def readNull(data): return (null(), data) def readString(data): if isinstance(data, str): length = string.find(data, '\0') else: length = data.find(bytes("\0", 'ascii')) nextData = int(math.ceil((length+1) / 4.0) * 4) return (data[0:length], data[nextData:]) def readBlob(data): try: length = struct.unpack(">i", data[0:4])[0] nextData = int(math.ceil((length) / 4.0) * 4) + 4 rest = data[nextData:] blob = data[4:length+4] return (blob, rest) except struct.error: print("Error: too few bytes for blob", data, len(data)) return ("", data) def readInt(data): try: integer = struct.unpack(">i", data[0:4])[0] rest = data[4:] return (integer, rest) except struct.error: print("Error: too few bytes for int", data, len(data)) return (0, data)
[docs]def readLong(data): """Tries to interpret the next 8 bytes of the data as a 64-bit signed integer.""" try: big = struct.unpack(">q", data[0:8])[0] rest = data[8:] return (big, rest) except struct.error: print("Error: too few bytes for long", data, len(data)) return (0, data)
[docs]def readDouble(data): """Tries to interpret the next 8 bytes of the data as a 64-bit double float.""" try: number = struct.unpack(">d", data[0:8])[0] rest = data[8:] return (number, rest) except struct.error: print("Error: too few bytes for double", data, len(data)) return (0, data)
def readFloat(data): try: float = struct.unpack(">f", data[0:4])[0] rest = data[4:] return (float, rest) except struct.error: print("Error: too few bytes for float", data, len(data)) return (0, data)
[docs]def OSCBlob(next): """Convert a string into an OSC Blob, returning a (typetag, data) tuple.""" if type(next) == type(""): length = len(next) padded = math.ceil((len(next)) / 4.0) * 4 binary = struct.pack(">i%ds" % (padded), length, next) tag = b'b' else: tag = b'' binary = b'' return (tag, binary)
[docs]def OSCArgument(data): """Convert some Python types to their OSC binary representations, returning a (typetag, data) tuple.""" if isinstance(data, bytearray): length = len(data) padded = math.ceil((len(data)) / 4.0) * 4 binary = struct.pack(b">i%ds" % (padded), length, str(data)) tag = b'b' elif isinstance(data, string_types): OSCstringLength = math.ceil((len(data)+1) / 4.0) * 4 binary = struct.pack(b">%ds" % (OSCstringLength), data) tag = b"s" elif isinstance(data, bool): binary = b"" if data: tag = b"T" else: tag = b"F" elif isinstance(data, float): binary = struct.pack(b">f", data) tag = b"f" elif isinstance(data, int): binary = struct.pack(b">i", data) tag = b"i" elif isinstance(data, impulse): binary = b"" tag = b"I" elif isinstance(data, null): binary = b"" tag = b"N" else: binary = b"" tag = b"" return (tag, binary)
[docs]def parseArgs(args): """Given a list of strings, produces a list where those strings have been parsed (where possible) as floats or integers.""" parsed = [] for arg in args: print(arg) arg = arg.strip() interpretation = None try: interpretation = float(arg) if string.find(arg, ".") == -1: interpretation = int(interpretation) except: # Oh - it was a string. interpretation = arg pass parsed.append(interpretation) return parsed
def decodeOSC(data): try: """Converts a typetagged OSC message to a Python list.""" table = { "i" : readInt, "f" : readFloat, "s" : readString, "b" : readBlob, "d" : readDouble, "t" : readLong, "T" : readTrue, "F" : readFalse, "I" : readImpulse, "N" : readNull } decoded = [] address, rest = readString(data) typetags = b"" if address == "#bundle": time, rest = readLong(rest) #decoded.append(address) #decoded.append(time) while len(rest)>0: length, rest = readInt(rest) decoded.append(decodeOSC(rest[:length])) rest = rest[length:] elif len(rest) > 0: typetags, rest = readString(rest) decoded.append(address) decoded.append(typetags) if typetags[0] == b",": for tag in typetags[1:]: value, rest = table[tag](rest) decoded.append(value) else: print("Oops, typetag lacks the magic ,") return decoded except Exception as e: print("exception: %s" % (e))
[docs]class CallbackManager: """This utility class maps OSC addresses to callables. The CallbackManager calls its callbacks with a list of decoded OSC arguments, including the address and the typetags as the first two arguments.""" def __init__(self): self.callbacks = {} self.add(self.unbundler, "#bundle")
[docs] def handle(self, data, source = None): """Given OSC data, tries to call the callback with the right address.""" decoded = decodeOSC(data) self.dispatch(decoded, source)
[docs] def dispatch(self, message, source = None): """Sends decoded OSC data to an appropriate calback""" if not message or len(message) == 0: # ignore empty messages return if type(message[0]) == list : # smells like nested messages for msg in message : self.dispatch(msg, source) elif type(message[0]) == str : # got a single message try: address = message[0] if self.callbacks.has_key(address): callbackfunction = self.callbacks[address] elif self.callbacks.has_key('default'): callbackfunction = self.callbacks['default'] else: print('address %s not found ' % address) return callbackfunction(message, source) return except IndexError as e: import traceback print('OSC callback %s caused an error: %s' % (address, e)) traceback.print_exc() print('---------------------') raise else: raise ValueError("OSC message not recognized", message)
[docs] def add(self, callback, name): """Adds a callback to our set of callbacks, or removes the callback with name if callback is None.""" if callback == None: del self.callbacks[name] else: self.callbacks[name] = callback
[docs] def unbundler(self, messages): """Dispatch the messages in a decoded bundle.""" # first two elements are #bundle and the time tag, rest are messages. for message in messages[2:]: self.dispatch(message)
if __name__ == "__main__": hexDump("Welcome to the OSC testing program.") print() message = OSCMessage() message.setAddress("/foo/play") message.append(44) message.append(11) message.append(4.5) message.append("the white cliffs of dover") hexDump(message.getBinary()) print("Making and unmaking a message..") strings = OSCMessage() strings.append("Mary had a little lamb") strings.append("its fleece was white as snow") strings.append("and everywhere that Mary went,") strings.append("the lamb was sure to go.") strings.append(14.5) strings.append(14.5) strings.append(-400) raw = strings.getBinary() hexDump(raw) print("Retrieving arguments...") data = raw for i in range(6): text, data = readString(data) print(text) number, data = readFloat(data) print(number) number, data = readFloat(data) print(number) number, data = readInt(data) print(number) hexDump(raw) print(decodeOSC(raw)) print(decodeOSC(message.getBinary())) print("Testing Blob types.") blob = OSCMessage() blob.append("","b") blob.append("b","b") blob.append("bl","b") blob.append("blo","b") blob.append("blob","b") blob.append("blobs","b") blob.append(42) hexDump(blob.getBinary()) print(decodeOSC(blob.getBinary())) def printingCallback(*stuff): sys.stdout.write("Got: ") for i in stuff: sys.stdout.write(str(i) + " ") sys.stdout.write("\n") print("Testing the callback manager.") c = CallbackManager() c.add(printingCallback, "/print") c.handle(message.getBinary()) message.setAddress("/print") c.handle(message.getBinary()) print1 = OSCMessage() print1.setAddress("/print") print1.append("Hey man, that's cool.".encode('utf-8')) print1.append(42) print1.append(3.1415926) c.handle(print1.getBinary()) bundle = OSCMessage() bundle.setAddress("") bundle.append("#bundle".encode('utf-8')) bundle.append(0) bundle.append(0) bundle.append(print1.getBinary(), 'b') bundle.append(print1.getBinary(), 'b') bundlebinary = bundle.message print("sending a bundle to the callback manager") c.handle(bundlebinary)