Skip to content
Snippets Groups Projects
fastopc.py 3.82 KiB
Newer Older
  • Learn to ignore specific revisions
  • """Simple high-performance Open Pixel Control client."""
    #
    # Copyright (c) 2013 Micah Elizabeth Scott
    # 
    # Permission is hereby granted, free of charge, to any person obtaining a copy
    # of this software and associated documentation files (the "Software"), to deal
    # in the Software without restriction, including without limitation the rights
    # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    # copies of the Software, and to permit persons to whom the Software is
    # furnished to do so, subject to the following conditions:
    # 
    # The above copyright notice and this permission notice shall be included in
    # all copies or substantial portions of the Software.
    # 
    # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    # THE SOFTWARE.
       
    import json
    import numpy
    import os
    import socket
    import struct
    import time
    
    
    class FastOPC(object):
        """High-performance Open Pixel Control client, using Numeric Python.
           By default, assumes the OPC server is running on localhost. This may be overridden
           with the OPC_SERVER environment variable, or the 'server' keyword argument.
           """
    
        def __init__(self, server=None):
            self.server = server or os.getenv('OPC_SERVER') or '127.0.0.1:7890'
            self.host, port = self.server.split(':')
            self.port = int(port)
            self.socket = None
    
    
        def send(self, packet):
            """Send a low-level packet to the OPC server, connecting if necessary
               and handling disconnects. Returns True on success.
               """
    
            if self.socket is None:
                try:
                    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    self.socket.connect((self.host, self.port))
                    self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
                except socket.error:
                    self.socket = None
    
            if self.socket is not None:        
                try:
                    self.socket.send(packet)
                    return True
                except socket.error:
                    self.socket = None
    
            # Limit CPU usage when polling for a server
            time.sleep(0.1)
    
            return False
    
        def putPixels(self, channel, *sources):
            """Send a list of 8-bit colors to the indicated channel. (OPC command 0x00).
               This command accepts a list of pixel sources, which are concatenated and sent.
               Pixel sources may be:
    
                - Strings or buffer objects containing pre-formatted 8-bit RGB pixel data
                - NumPy arrays or sequences containing 8-bit RGB pixel data.
                  If values are out of range, the array is modified.
               """
    
            parts = []
            bytes = 0
    
            for source in sources:
                if isinstance(source, buffer):
                    source = str(source)
                elif not isinstance(source, str):
                    if not isinstance(source, numpy.ndarray):
                        source = numpy.array(source)
                    numpy.clip(source, 0, 255, source)
                    source = source.astype('B').tostring()
    
                bytes += len(source)
                parts.append(source)
    
            parts.insert(0, struct.pack('>BBH', channel, 0, bytes))
            self.send(''.join(parts))
    
        def sysEx(self, systemId, commandId, msg):
            self.send(struct.pack(">BBHHH", 0, 0xFF, len(msg) + 4, systemId, commandId) + msg)
    
        def setGlobalColorCorrection(self, gamma, r, g, b):
            self.sysEx(1, 1, json.dumps({'gamma': gamma, 'whitepoint':[r,g,b]}))