From 3ab33f728f61b12f85a6067d02610c2b5142a4a5 Mon Sep 17 00:00:00 2001 From: Bent Bisballe Nyeng Date: Fri, 24 Feb 2012 14:51:15 +0100 Subject: Fix crash. Hold queue for each client connection. --- src/muniad.cc | 7 +++--- src/task_proto.cc | 72 +++++++++++++++++++------------------------------------ 2 files changed, 29 insertions(+), 50 deletions(-) diff --git a/src/muniad.cc b/src/muniad.cc index 82311e7..ef76ff1 100644 --- a/src/muniad.cc +++ b/src/muniad.cc @@ -110,8 +110,7 @@ int main(int argc, char **argv) } } - if (!use_ssl) - cert_path = key_path = NULL; + if(!use_ssl) cert_path = key_path = NULL; context = libwebsocket_create_context(port, interface, protocols, libwebsocket_internal_extensions, @@ -121,7 +120,9 @@ int main(int argc, char **argv) return -1; } - while (1) libwebsocket_service(context, 50); + while (1) { + libwebsocket_service(context, 50); + } libwebsocket_context_destroy(context); diff --git a/src/task_proto.cc b/src/task_proto.cc index af1e31d..d4d9a70 100644 --- a/src/task_proto.cc +++ b/src/task_proto.cc @@ -31,6 +31,7 @@ #include #include +#include #include "task.h" @@ -72,23 +73,8 @@ static void dump_handshake_info(struct lws_tokens *lwst) } } -#define MAX_MESSAGE_QUEUE 1024 +std::map > msgqueue; -struct a_message { - void *payload; - size_t len; -}; - -typedef std::list ClientList; - -std::queue msgqueue; - -/* -static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; -static int ringbuffer_head; -*/ - -static ClientList clientlist; static int id_count = 0; int callback_lws_task(struct libwebsocket_context * context, @@ -100,8 +86,6 @@ int callback_lws_task(struct libwebsocket_context * context, struct per_session_data__lws_task *pss = (struct per_session_data__lws_task *)user; - printf("Current number of clients: %d\n", clientlist.size()); - switch (reason) { printf("============= Reason %d\n", reason); case LWS_CALLBACK_ESTABLISHED: @@ -109,7 +93,6 @@ int callback_lws_task(struct libwebsocket_context * context, fprintf(stderr, "callback_lws_task: LWS_CALLBACK_ESTABLISHED\n"); // pss->ringbuffer_tail = ringbuffer_head; pss->wsi = wsi; - clientlist.push_back(wsi); // send all current tasks char buf[32]; @@ -119,7 +102,7 @@ int callback_lws_task(struct libwebsocket_context * context, struct task t = *it; sprintf(buf, "add %d %s %s %d %d;", - t.id, t.title.c_str(), t.desc.c_str(), t.x, t.y); + t.id, t.title.c_str(), t.desc.c_str(), t.x, t.y); init_str.append(buf); } @@ -127,24 +110,16 @@ int callback_lws_task(struct libwebsocket_context * context, msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); msg.append(init_str); msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); - msgqueue.push(msg); + msgqueue[wsi].push(msg); - /* - n = libwebsocket_write(wsi, (unsigned char*) init_str.data(), - init_str.length(), LWS_WRITE_TEXT); - if(n < 0) { - fprintf(stderr, "ERROR writing to socket"); - exit(1); - } - */ - libwebsocket_rx_flow_control(wsi, 1); + libwebsocket_callback_on_writable(context, wsi); } break; case LWS_CALLBACK_CLOSED: { printf("Closing connection\n"); - clientlist.remove(wsi); + msgqueue.erase(wsi); } case LWS_CALLBACK_SERVER_WRITEABLE: @@ -152,9 +127,9 @@ int callback_lws_task(struct libwebsocket_context * context, printf("LWS_CALLBACK_SERVER_WRITEABLE\n"); // if(pss->ringbuffer_tail != ringbuffer_head) { - if(msgqueue.size() > 0) { - std::string msg = msgqueue.front(); - msgqueue.pop(); + if(msgqueue[wsi].size() > 0) { + std::string msg = msgqueue[wsi].front(); + msgqueue[wsi].pop(); n = libwebsocket_write(wsi, (unsigned char *) msg.c_str() + LWS_SEND_BUFFER_PRE_PADDING, @@ -164,17 +139,11 @@ int callback_lws_task(struct libwebsocket_context * context, fprintf(stderr, "ERROR writing to socket"); exit(1); } - - /* - if(((ringbuffer_head - pss->ringbuffer_tail) % MAX_MESSAGE_QUEUE) - < (MAX_MESSAGE_QUEUE - 15)) - libwebsocket_rx_flow_control(wsi, 1); - */ - - // libwebsocket_rx_flow_control(wsi,msgqueue.size() > 0); - + } + + if(msgqueue[wsi].size()) { + libwebsocket_rx_flow_control(wsi, 1); libwebsocket_callback_on_writable(context, wsi); - usleep(200); } } break; @@ -276,8 +245,11 @@ int callback_lws_task(struct libwebsocket_context * context, bool id_found = false; TaskList::iterator it; + + x = x / 300 * 300; + for(it = tasklist.begin(); it != tasklist.end(); it++) { - struct task t = *it; + struct task &t = *it; if(t.id == id) { id_found = true; t.x = x; @@ -290,7 +262,7 @@ int callback_lws_task(struct libwebsocket_context * context, printf("\t!!!Could not locate task with id %d\n", id); } - buf_len = sprintf(buf, "move %d %d %d;", id, x / 300 * 300, y); + buf_len = sprintf(buf, "move %d %d %d;", id, x, y); printf("Moving task: %s\n", buf); } else if(cmd == "update") { printf("Update\n"); @@ -318,7 +290,13 @@ int callback_lws_task(struct libwebsocket_context * context, msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); msg.append(buf, buf_len); msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); - msgqueue.push(msg); + + std::map >::iterator it = msgqueue.begin(); + while(it != msgqueue.end()) { + printf("!!\n"); + it->second.push(msg); + it++; + } /* if(((ringbuffer_head - pss->ringbuffer_tail) % MAX_MESSAGE_QUEUE) > (MAX_MESSAGE_QUEUE - 10)) libwebsocket_rx_flow_control(wsi, 0); -- cgit v1.2.3