# pylint: disable=W0611
''' simpleOSC 0.2
ixi software - July, 2006
www.ixi-software.net
simple API for the Open SoundControl for Python (by Daniel Holth, Clinton
McChesney --> pyKit.tar.gz file at http://wiretap.stetson.edu)
Documentation at http://wiretap.stetson.edu/docs/pyKit/
The main aim of this implementation is to provide with a simple way to deal
with the OSC implementation that makes life easier to those who don't have
understanding of sockets or programming. This would not be on your screen without the help
of Daniel Holth.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
Thanks for the support to Buchsenhausen, Innsbruck, Austria.
'''
from . import OSC
import socket, os, time, errno, sys
from threading import Lock
from kivy.logger import Logger
try:
# multiprocessing support is not good on window
if sys.platform in ('win32', 'cygwin'):
raise
use_multiprocessing = True
from multiprocessing import Process, Queue, Value
import multiprocessing.synchronize
Logger.info('OSC: using <multiprocessing> for socket')
except:
use_multiprocessing = False
from collections import deque
from threading import Thread
Logger.info('OSC: using <thread> for socket')
# globals
outSocket = 0
oscThreads = {}
oscLock = Lock()
if use_multiprocessing:
def _readQueue(thread_id=None):
global oscThreads
thread = oscThreads[thread_id]
try:
while True:
message = thread.queue.get_nowait()
thread.addressManager.handle(message)
except:
pass
class _OSCServer(Process):
def __init__(self, **kwargs):
self.addressManager = OSC.CallbackManager()
self.queue = Queue()
Process.__init__(self, args=(self.queue,))
self.daemon = True
self._isRunning = Value('b', True)
self._haveSocket= Value('b', False)
def _queue_message(self, message):
self.queue.put(message)
def _get_isRunning(self):
return self._isRunning.value
def _set_isRunning(self, value):
self._isRunning.value = value
isRunning = property(_get_isRunning, _set_isRunning)
def _get_haveSocket(self):
return self._haveSocket.value
def _set_haveSocket(self, value):
self._haveSocket.value = value
haveSocket = property(_get_haveSocket, _set_haveSocket)
else:
def _readQueue(thread_id=None):
thread = oscThreads[thread_id]
q = thread.queue
h = thread.addressManager.handle
while q:
h(q.popleft())
class _OSCServer(Thread):
def __init__(self, **kwargs):
Thread.__init__(self)
self.addressManager = OSC.CallbackManager()
self.queue = deque()
self.daemon = True
self.isRunning = True
self.haveSocket = False
def _queue_message(self, message):
self.queue.append(message)
[docs]def init() :
'''instantiates address manager and outsocket as globals
'''
global outSocket
outSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
[docs]def bind(oscid, func, oscaddress):
'''bind given oscaddresses with given functions in address manager
'''
global oscThreads
thread = oscThreads.get(oscid, None)
if thread is None:
assert('Unknown thread')
thread.addressManager.add(func, oscaddress)
[docs]def sendMsg(oscAddress, dataArray=[], ipAddr='127.0.0.1', port=9000, typehint=None) :
'''create and send normal OSC msgs
defaults to '127.0.0.1', port 9000
'''
with oscLock:
outSocket.sendto(
createBinaryMsg(oscAddress, dataArray, typehint), (ipAddr, port))
[docs]def createBundle():
'''create bundled type of OSC messages
'''
b = OSC.OSCMessage()
b.address = ""
b.append(b"#bundle")
b.append(0)
b.append(0)
return b
[docs]def appendToBundle(bundle, oscAddress, dataArray):
'''create OSC message and append it to a given bundle
'''
bundle.append(createBinaryMsg(oscAddress, dataArray), b'b')
[docs]def sendBundle(bundle, ipAddr='127.0.0.1', port=9000) :
'''convert bundle to a binary and send it
'''
with oscLock:
outSocket.sendto(bundle.message, (ipAddr, port))
[docs]def createBinaryMsg(oscAddress, dataArray, typehint=None):
'''create and return general type binary OSC msg
'''
m = OSC.OSCMessage()
m.address = oscAddress
for x in dataArray:
m.append(x, typehint)
return m.getBinary()
[docs]def readQueue(thread_id=None):
'''Read queues from all threads, and dispatch message.
This must be call in the main thread.
You can pass the thread id to deque message from a specific thread.
This id is returned from the listen() function'''
return _readQueue(thread_id)
################################ receive osc from The Other.
[docs]class OSCServer(_OSCServer):
def __init__(self, **kwargs):
kwargs.setdefault('ipAddr', '127.0.0.1')
kwargs.setdefault('port', 9001)
super(OSCServer, self).__init__()
self.ipAddr = kwargs.get('ipAddr')
self.port = kwargs.get('port')
[docs] def run(self):
self.haveSocket = False
# create socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# fix trouble if python leave without cleaning well the socket
# not needed under windows, he can reuse addr even if the socket
# are in fin2 or wait state.
if os.name in ['posix', 'mac'] and hasattr(socket, 'SO_REUSEADDR'):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# try to bind the socket, retry if necessary
while not self.haveSocket and self.isRunning:
try :
self.socket.bind((self.ipAddr, self.port))
self.socket.settimeout(0.5)
self.haveSocket = True
except socket.error as e:
error, message = e.args
# special handle for EADDRINUSE
if error == errno.EADDRINUSE:
Logger.error('OSC: Address %s:%i already in use, retry in 2 second' % (self.ipAddr, self.port))
else:
self.haveSocket = False
# sleep 2 second before retry
time.sleep(2)
Logger.info('OSC: listening for Tuio on %s:%i' % (self.ipAddr, self.port))
while self.isRunning:
try:
message = self.socket.recv(65535)
self._queue_message(message)
except Exception as e:
if type(e) == socket.timeout:
continue
Logger.exception('OSC: Error in Tuio recv()')
return 'no data arrived'
[docs]def listen(ipAddr='127.0.0.1', port=9001):
'''Creates a new thread listening to that port
defaults to ipAddr='127.0.0.1', port 9001
'''
global oscThreads
thread_id = '%s:%d' % (ipAddr, port)
if thread_id in oscThreads:
return
Logger.debug('OSC: Start thread <%s>' % thread_id)
oscThreads[thread_id] = OSCServer(ipAddr=ipAddr, port=port)
oscThreads[thread_id].start()
return thread_id
[docs]def dontListen(thread_id = None):
'''closes the socket and kills the thread
'''
global oscThreads
if thread_id and thread_id in oscThreads:
ids = [thread_id]
else:
ids = list(oscThreads.keys())
for thread_id in ids:
#oscThreads[thread_id].socket.close()
Logger.debug('OSC: Stop thread <%s>' % thread_id)
oscThreads[thread_id].isRunning = False
oscThreads[thread_id].join()
Logger.debug('OSC: Stop thread <%s> finished' % thread_id)
del oscThreads[thread_id]
if __name__ == '__main__':
# example of how to use oscAPI
init()
oscid = listen() # defaults to "127.0.0.1", 9001
time.sleep(5)
# add addresses to callback manager
def printStuff(msg):
'''deals with "print" tagged OSC addresses
'''
print("printing in the printStuff function ", msg)
print("the oscaddress is ", msg[0])
print("the value is ", msg[2])
bind(oscid, printStuff, "/test")
#send normal msg, two ways
sendMsg("/test", [1, 2, 3], "127.0.0.1", 9000)
sendMsg("/test2", [1, 2, 3]) # defaults to "127.0.0.1", 9000
sendMsg("/hello") # defaults to [], "127.0.0.1", 9000
# create and send bundle, to ways to send
bundle = createBundle()
appendToBundle(bundle, "/testing/bundles", [1, 2, 3])
appendToBundle(bundle, "/testing/bundles", [4, 5, 6])
sendBundle(bundle, "127.0.0.1", 9000)
sendBundle(bundle) # defaults to "127.0.0.1", 9000
dontListen() # finally close the connection bfore exiting or program