Newer
Older
* Open Pixel Control and HTTP server for Fadecandy.
*
* This is a TCP server which accepts either Open Pixel Control, HTTP, or WebSockets
* connections on a single port. We use a fork of libwebsockets which supports serving
* external non-HTTP protocols via a low-level receive callback.
*
* Copyright (c) 2013 Micah Elizabeth Scott
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "netserver.h"
#include "version.h"
#include "libwebsockets.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
Micah Elizabeth Scott
committed
#include <algorithm>
NetServer::NetServer(OPC::callback_t opcCallback, jsonCallback_t jsonCallback,
void *context, bool verbose)
: mOpcCallback(opcCallback), mJsonCallback(jsonCallback),
mUserContext(context), mThread(0), mVerbose(verbose)
bool NetServer::start(const char *host, int port)
{
const int llNormal = LLL_ERR | LLL_WARN;
const int llVerbose = llNormal | LLL_NOTICE;
static struct libwebsocket_protocols protocols[] = {
// Only one protocol for now. Handles HTTP as well as our default WebSockets protocol.
"fcserver", // Name
lwsCallback, // Callback
sizeof(Client), // Protocol-specific data size
sizeof(OPC::Message), // Max frame size / rx buffer
},
{ NULL, NULL, 0, 0 } // terminator
};
struct lws_context_creation_info info;
memset(&info, 0, sizeof info);
info.gid = -1;
info.uid = -1;
info.host = host;
info.port = port;
info.protocols = protocols;
info.user = this;
// Quieter during create_context, since it's kind of chatty.
lws_set_log_level(llNormal, NULL);
struct libwebsocket_context *context = libwebsocket_create_context(&info);
if (!context) {
lwsl_err("libwebsocket init failed\n");
return false;
// Maybe set up a more verbose log level now.
lws_set_log_level(llVerbose, NULL);
Micah Elizabeth Scott
committed
lwsl_notice("Server listening on %s:%d\n", host ? host : "*", port);
// Note that we pass ownership of all libwebsockets state to this new thread.
// We shouldn't access it on the other threads afterwards.
mThread = new tthread::thread(threadFunc, context);
return true;
void NetServer::threadFunc(void *arg)
struct libwebsocket_context *context = (libwebsocket_context*) arg;
NetServer *self = (NetServer*) libwebsocket_context_user(context);
/*
* Mostly we're just handling incoming events from libwebsocket's poll(),
* but we do have some non-latency-critical broadcast events to flush out
* periodically.
*
* These would be faster if we could wake up libwebsocket's poll() from another
* thread, but there isn't an easy way to do that now, and we don't really care
* that much. During normal operation we'll be receiving lots of data over the
* network anyway.
*/
while (libwebsocket_service(context, 100) >= 0) {
self->flushBroadcastList();
}
libwebsocket_context_destroy(context);
Micah Elizabeth Scott
committed
}
int NetServer::lwsCallback(libwebsocket_context *context, libwebsocket *wsi,
enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
Micah Elizabeth Scott
committed
{
* Protocol callback for libwebsockets.
*
* Until we have a reason to support a non-default WebSockets protocol, this handles
* everything: plain HTTP, Open Pixel Control, and WebSockets.
*
* For HTTP, this serves simple static HTML documents which are included at compile-time
* from the 'http' directory. Our web UI interacts with the server via the public WebSockets API.
*/
NetServer *self = (NetServer*) libwebsocket_context_user(context);
Client *client = (Client*) user;
switch (reason) {
case LWS_CALLBACK_CLOSED:
case LWS_CALLBACK_CLOSED_HTTP:
if (client && client->opcBuffer) {
free(client->opcBuffer);
client->opcBuffer = NULL;
}
self->mClients.erase(wsi);
break;
case LWS_CALLBACK_ESTABLISHED:
self->mClients.insert(wsi);
break;
case LWS_CALLBACK_HTTP:
return self->httpBegin(context, wsi, *client, (const char*) in);
case LWS_CALLBACK_HTTP_FILE_COMPLETION:
// Only serve one file per connect
return -1;
case LWS_CALLBACK_HTTP_WRITEABLE:
return self->httpWrite(context, wsi, *client);
case LWS_CALLBACK_SOCKET_READ:
// Low-level socket read. We may trap these for OPC protocol handling.
if (client->state != CLIENT_STATE_HTTP) {
return self->opcRead(context, wsi, *client, (uint8_t*)in, len);
Micah Elizabeth Scott
committed
}
Micah Elizabeth Scott
committed
case LWS_CALLBACK_RECEIVE:
// WebSockets data received
return self->wsRead(context, wsi, *client, (uint8_t*)in, len);
default:
break;
Micah Elizabeth Scott
committed
}
Micah Elizabeth Scott
committed
}
int NetServer::opcRead(libwebsocket_context *context, libwebsocket *wsi,
Client &client, uint8_t *in, size_t len)
Micah Elizabeth Scott
committed
{
/*
* Open Pixel Control packet dispatch, and protocol detection.
*
* Store the new packet in our protocol buffer. This should be large enough to never overflow,
* since our OPC buffer is large enough for two packets and the OPC max packet size is much larger
* than the network receive buffer.
*
* If we have no buffered data yet, we can do this without copying.
*/
OPCBuffer *opcb;
uint8_t *buffer;
unsigned bufferLength;
// Allocate the buffer we use for OPC reassembly and protocol-detect.
if (client.opcBuffer == NULL) {
opcb = (OPCBuffer*) malloc(sizeof *client.opcBuffer);
if (opcb == NULL) {
lwsl_err("ERROR: Out of memory allocating OPC reassembly buffer.\n");
return -1;
}
opcb->bufferLength = 0;
client.opcBuffer = opcb;
} else {
opcb = client.opcBuffer;
}
if (len + opcb->bufferLength > sizeof opcb->buffer) {
if (opcb->bufferLength) {
memcpy(opcb->bufferLength + opcb->buffer, in, len);
buffer = opcb->buffer;
bufferLength = opcb->bufferLength + len;
} else {
buffer = in;
bufferLength = len;
if (client.state == CLIENT_STATE_PROTOCOL_DETECT) {
/*
* It's a new connection, and we aren't sure yet whether it's native OPC
* or HTTP / WebSockets. We examine the first four bytes received. If it's
* "GET ", we assume this is HTTP. (Other HTTP methods are not needed).
* If it's anything else, we interpret the connection as native OPC and these
* are the first four bytes of the first OPC packet.
*/
if (bufferLength < 4) {
// Not enough data for protocol detect yet. Save this data for later.
if (buffer != opcb->buffer) {
memcpy(opcb->buffer, buffer, bufferLength);
opcb->bufferLength = bufferLength;
// Do not pass this data on to libwebsocket yet
return 1;
}
if (buffer[0] == 'G' && buffer[1] == 'E' && buffer[2] == 'T' && buffer[3] == ' ') {
// Detected HTTP. Convert this to an HTTP client, and let libwebsockets handle
// all data received so far. We can jettison the OPC buffer at this point.
free(client.opcBuffer);
client.opcBuffer = 0;
client.state = CLIENT_STATE_HTTP;
if (libwebsocket_read(context, wsi, buffer, bufferLength) < 0) {
return -1;
}
return 1;
Micah Elizabeth Scott
committed
}
// Not HTTP. Handle this as an OPC socket.
client.state = CLIENT_STATE_OPEN_PIXEL_CONTROL;
lwsl_notice("New Open Pixel Control connection\n");
// Process any and all complete packets from our buffer
while (1) {
if (bufferLength < OPC::HEADER_BYTES) {
// Still waiting for a header
Micah Elizabeth Scott
committed
break;
OPC::Message *msg = (OPC::Message*) buffer;
unsigned msgLength = OPC::HEADER_BYTES + msg->length();
if (bufferLength < msgLength) {
// Waiting for more data
Micah Elizabeth Scott
committed
break;
// Complete packet.
mOpcCallback(*msg, mUserContext);
buffer += msgLength;
bufferLength -= msgLength;
Micah Elizabeth Scott
committed
// If we have any residual data, save it for later.
if (bufferLength && buffer != opcb->buffer) {
memmove(opcb->buffer, buffer, bufferLength);
opcb->bufferLength = bufferLength;
// Don't pass data on to libwebsockets
return 1;
}
bool NetServer::httpPathEqual(const char *a, const char *b)
{
// HTTP path comparison. Stop at '?' or '#', to ignore query/fragment portions.
for (;;) {
char ca = *a;
char cb = *b;
if (ca == '?' || cb == '?' || ca == '#' || cb == '#')
return true;
if (ca != cb)
return false;
if (ca == '\0')
return true;
a++;
b++;
}
int NetServer::httpBegin(libwebsocket_context *context, libwebsocket *wsi,
Client &client, const char *path)
/*
* We have a new plain HTTP request. Match it against our document list, and send
* back headers for the response.
*
* Note: To keep the size of fcserver down, we compress our HTTP documents.
* We don't bother supporting decompressing them. Instead, we always send
* them back with deflate content-encoding.
HTTPDocument *doc = httpDocumentList;
// Look for this path in the document list. If it isn't found, we'll serve the 404 doc.
while (doc->path && !httpPathEqual(doc->path, path))
doc++;
if (!doc->path) {
lwsl_notice("HTTP document not found, \"%s\"\n", path);
char buffer[1024];
int size = snprintf(buffer, sizeof buffer,
"HTTP/1.1 %d %s\r\n"
"Server: %s\r\n"
"Content-Type: %s\r\n"
"Content-Length: %u\r\n"
"Content-Encoding: deflate\r\n"
"Connection: close\r\n"
"\r\n",
doc->path ? 200 : 404,
doc->path ? "OK" : "Not Found",
kFCServerVersion,
doc->contentType,
doc->contentLength
if (libwebsocket_write(wsi, (unsigned char*) buffer, size, LWS_WRITE_HTTP) < 0) {
return -1;
}
// Write the body asynchronously
client.httpBody = doc->body;
client.httpLength = doc->contentLength;
libwebsocket_callback_on_writable(context, wsi);
return 0;
int NetServer::httpWrite(libwebsocket_context *context, libwebsocket *wsi, Client &client)
if (!client.httpBody) {
return -1;
}
Micah Elizabeth Scott
committed
if (client.httpLength <= 0) {
// End of document
return -1;
Micah Elizabeth Scott
committed
}
int m = libwebsocket_write(wsi, (unsigned char *) client.httpBody, client.httpLength, LWS_WRITE_HTTP);
if (m < 0) {
return -1;
}
client.httpBody += m;
client.httpLength -= m;
} while (!lws_send_pipe_choked(wsi));
libwebsocket_callback_on_writable(context, wsi);
return 0;
}
int NetServer::wsRead(libwebsocket_context *context, libwebsocket *wsi, Client &client, uint8_t *in, size_t len)
{
// If this frame is binary, it's an OPC message. Does it parse?
if (lws_frame_is_binary(wsi)) {
OPC::Message *msg = (OPC::Message*) in;
if (len < OPC::HEADER_BYTES) {
lwsl_notice("NOTICE: Received binary WebSockets packet, but it's too small for an OPC header.\n");
return 0;
}
if (msg->lenLow != 0 || msg->lenHigh != 0) {
lwsl_notice("NOTICE: Received OPC packet over WebSockets with nonzero reserved (length) fields.\n");
}
if (len > sizeof *msg) {
lwsl_notice("NOTICE: Received oversized OPC packet over WebSockets. Truncating.\n");
len = sizeof *msg;
}
msg->setLength(len - OPC::HEADER_BYTES);
mOpcCallback(*msg, mUserContext);
return 0;
}
// Text frames are JSON encoded. Does that parse?
rapidjson::Document message;
message.ParseInsitu<0>((char*) in);
if (message.HasParseError()) {
lwsl_notice("NOTICE: Parse error in received JSON, character %d: %s\n",
int(message.GetErrorOffset()), message.GetParseError());
return 0;
}
if (!message.IsObject()) {
lwsl_notice("NOTICE: Received JSON is not an object {}\n");
return 0;
}
mJsonCallback(wsi, message, mUserContext);
return 0;
}
int NetServer::jsonReply(libwebsocket *wsi, rapidjson::Document &message)
{
jsonBuffer_t buffer;
jsonBufferPrepare(buffer, message);
return jsonBufferSend(buffer, wsi);
}
void NetServer::jsonBufferPrepare(jsonBuffer_t &buffer, rapidjson::Value &value)
{
// Pre-packet padding
rapidjson::PutN<>(buffer, 0, LWS_SEND_BUFFER_PRE_PADDING);
// Write serialized message
rapidjson::Writer<rapidjson::GenericStringBuffer<rapidjson::UTF8<>>> writer(buffer);
value.Accept(writer);
// Post-packet padding
rapidjson::PutN<>(buffer, 0, LWS_SEND_BUFFER_POST_PADDING);
int NetServer::jsonBufferSend(jsonBuffer_t &buffer, libwebsocket *wsi)
{
const char *string = buffer.GetString() + LWS_SEND_BUFFER_PRE_PADDING;
size_t len = buffer.Size() - LWS_SEND_BUFFER_PRE_PADDING - LWS_SEND_BUFFER_POST_PADDING;
return libwebsocket_write(wsi, (unsigned char *) string, len, LWS_WRITE_TEXT);
}
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
void NetServer::flushBroadcastList()
{
// Send any pending broadcast packets. These are enqueued by other threads on a list
// protected by mBroadcastMutex.
mBroadcastMutex.lock();
for (std::vector<jsonBuffer_t*>::iterator buf = mBroadcastList.begin(); buf != mBroadcastList.end(); ++buf) {
for (std::set<libwebsocket*>::iterator cli = mClients.begin(); cli != mClients.end(); ++cli) {
jsonBufferSend(**buf, *cli);
}
delete *buf;
}
mBroadcastList.clear();
mBroadcastMutex.unlock();
}
void NetServer::jsonBroadcast(rapidjson::Document &message)
{
jsonBuffer_t *buffer = new jsonBuffer_t();
jsonBufferPrepare(*buffer, message);
mBroadcastMutex.lock();
mBroadcastList.push_back(buffer);
mBroadcastMutex.unlock();
}