From 523701245d2cc34de627b1f2579561822f899deb Mon Sep 17 00:00:00 2001 From: Bent Bisballe Nyeng Date: Fri, 18 May 2012 09:24:53 +0200 Subject: Fix 'multi observe one client' problem with remove/create msg collapse into move msg. --- src/munia_proto.cc | 136 +++++++++++++++++++---------------------------------- 1 file changed, 49 insertions(+), 87 deletions(-) diff --git a/src/munia_proto.cc b/src/munia_proto.cc index 3621bbf..7e63d73 100644 --- a/src/munia_proto.cc +++ b/src/munia_proto.cc @@ -81,7 +81,8 @@ static void dump_handshake_info(struct lws_tokens *lwst) } #endif -static std::map > msgqueue; +typedef std::list msgqueue_t; +static std::map msgqueue; int callback_lws_task(struct libwebsocket_context * context, struct libwebsocket *wsi, @@ -102,108 +103,68 @@ int callback_lws_task(struct libwebsocket_context * context, break; case LWS_CALLBACK_SERVER_WRITEABLE: - /* { - printf("Socket for client %p writable\n", wsi); - if(msgqueue[wsi].size() > 0) { - message_t msg = msgqueue[wsi].front(); - msgqueue[wsi].pop(); - - std::string msgcmd = msg_tostring(msg); - std::string msg; - clientid_t clientid = (*ci).first; - char tidstr[32]; - sprintf(tidstr, "%u", (*ci).second); - printf("\tAdding data to %p's queue\n", clientid); - - msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); - msg.append(tidstr); - msg.append(" "); - msg.append(msgcmd); - msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); - - int n = libwebsocket_write(wsi, (unsigned char *) - msg.c_str() + - LWS_SEND_BUFFER_PRE_PADDING, - msg.length() - LWS_SEND_BUFFER_POST_PADDING - - LWS_SEND_BUFFER_PRE_PADDING, - LWS_WRITE_TEXT); - if(n < 0) { - fprintf(stderr, "ERROR writing to socket"); - exit(1); + // + // Find and collapse remove/create msg's into single move msg at + // the position of the remove msg. + // + msgqueue_t::iterator i = msgqueue[wsi].begin(); + while(i != msgqueue[wsi].end()) { + message_t &msg = *i; + if(msg.cmd == cmd::remove) { + + msgqueue_t::iterator j = msgqueue[wsi].begin(); + while(j != msgqueue[wsi].end()) { + message_t m = *j; + if(m.cmd == cmd::create && // msg was a create msg? + // create with same ids as we just removed? + msg.remove.id == m.create.id && msg.tid == m.tid) { + break; + } + j++; + } + + // collapse into move msg + if(j != msgqueue[wsi].end()) { + taskid_t id = j->create.id; + taskid_t parentid = j->create.id; + msgqueue[wsi].erase(j); // erase create msg + msg.cmd = cmd::move; + msg.move.id = id; + msg.move.parentid = parentid; + } } - } - else { - printf("\tNo queued data awaiting transmission\n"); + i++; } - if(msgqueue[wsi].size()) { - libwebsocket_rx_flow_control(wsi, 1); - libwebsocket_callback_on_writable(context, wsi); - } - } - */ - { + std::string msgstr; msgstr.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); - int c = 0; - message_t prevmsg; - std::queue messages; while(msgqueue[wsi].size() > 0) { message_t msg = msgqueue[wsi].front(); - msgqueue[wsi].pop(); - - //Rewriting delete id followed by create id to move to_id from_id - // look at previous message from create commands - if(c > 0 && // look backward (skip first entry) - msg.cmd == cmd::create && // look from create commands - prevmsg.cmd == cmd::remove && // if previous msg is a remove command - msg.create.id == prevmsg.remove.id) { // and commands have same id - - printf("\tprevmsg: %s, msg: %s\n", msg_tostring(prevmsg).c_str(), - msg_tostring(msg).c_str()); - message_t movemsg = create_msg_move(msg.create.id, msg.create.parentid); - movemsg.tid = msg.tid; - msg = movemsg; - } - else if(c > 0) { - messages.push(prevmsg); - } - - prevmsg = msg; - c++; - } - if(c > 0) { // Push last msg - messages.push(prevmsg); - } - - // create msg string - // while(msgqueue[wsi].size() > 0) { - while(messages.size() > 0) { - message_t msg = messages.front(); - messages.pop(); + msgqueue[wsi].pop_front(); char buf[32]; sprintf(buf, "%d", msg.tid); - printf("msg: %s\n", msg_tostring(msg).c_str()); if(msgstr.size() > LWS_SEND_BUFFER_PRE_PADDING) msgstr += " "; msgstr += std::string(buf) + " " + msg_tostring(msg); } msgstr.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); - + int n = libwebsocket_write(wsi, (unsigned char *) - msgstr.c_str() + - LWS_SEND_BUFFER_PRE_PADDING, - msgstr.length() - - LWS_SEND_BUFFER_POST_PADDING - - LWS_SEND_BUFFER_PRE_PADDING, - LWS_WRITE_TEXT); + msgstr.c_str() + + LWS_SEND_BUFFER_PRE_PADDING, + msgstr.length() - + LWS_SEND_BUFFER_POST_PADDING - + LWS_SEND_BUFFER_PRE_PADDING, + LWS_WRITE_TEXT); if(n < 0) { fprintf(stderr, "ERROR writing to socket"); exit(1); - } } - + } + } + break; /* @@ -242,8 +203,8 @@ int callback_lws_task(struct libwebsocket_context * context, message_t updatemsg = create_msg_update(task); updatemsg.tid = omi->observe.id; - msgqueue[wsi].push(createmsg); - msgqueue[wsi].push(updatemsg); + msgqueue[wsi].push_back(createmsg); + msgqueue[wsi].push_back(updatemsg); id++; } @@ -257,10 +218,11 @@ int callback_lws_task(struct libwebsocket_context * context, message_t removemsg = create_msg_remove(task); removemsg.tid = omi->observe.id; - msgqueue[wsi].push(removemsg); + msgqueue[wsi].push_back(removemsg); id++; } + } else { printf("%d nodes affected by command\n", omi->nodes.size()); @@ -277,7 +239,7 @@ int callback_lws_task(struct libwebsocket_context * context, message_t msg = *omi; msg.tid = tid; - msgqueue[clientid].push(msg); + msgqueue[clientid].push_back(msg); ci++; } -- cgit v1.2.3