summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBent Bisballe Nyeng <deva@aasimon.org>2012-05-18 09:24:53 +0200
committerBent Bisballe Nyeng <deva@aasimon.org>2012-05-18 09:24:53 +0200
commit523701245d2cc34de627b1f2579561822f899deb (patch)
treebb608b1657142b5f0b69372c7c966627bdc8fc87
parenta09d97114ae07b4a527d87789456a1624a526c98 (diff)
Fix 'multi observe one client' problem with remove/create msg collapse into move msg.
-rw-r--r--src/munia_proto.cc136
1 files changed, 49 insertions, 87 deletions
diff --git a/src/munia_proto.cc b/src/munia_proto.cc
index 3621bbf..7e63d73 100644
--- a/src/munia_proto.cc
+++ b/src/munia_proto.cc
@@ -81,7 +81,8 @@ static void dump_handshake_info(struct lws_tokens *lwst)
}
#endif
-static std::map<struct libwebsocket *, std::queue<message_t> > msgqueue;
+typedef std::list<message_t> msgqueue_t;
+static std::map<struct libwebsocket *, msgqueue_t > msgqueue;
int callback_lws_task(struct libwebsocket_context * context,
struct libwebsocket *wsi,
@@ -102,108 +103,68 @@ int callback_lws_task(struct libwebsocket_context * context,
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
- /*
{
- printf("Socket for client %p writable\n", wsi);
- if(msgqueue[wsi].size() > 0) {
- message_t msg = msgqueue[wsi].front();
- msgqueue[wsi].pop();
-
- std::string msgcmd = msg_tostring(msg);
- std::string msg;
- clientid_t clientid = (*ci).first;
- char tidstr[32];
- sprintf(tidstr, "%u", (*ci).second);
- printf("\tAdding data to %p's queue\n", clientid);
-
- msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
- msg.append(tidstr);
- msg.append(" ");
- msg.append(msgcmd);
- msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
-
- int n = libwebsocket_write(wsi, (unsigned char *)
- msg.c_str() +
- LWS_SEND_BUFFER_PRE_PADDING,
- msg.length() - LWS_SEND_BUFFER_POST_PADDING -
- LWS_SEND_BUFFER_PRE_PADDING,
- LWS_WRITE_TEXT);
- if(n < 0) {
- fprintf(stderr, "ERROR writing to socket");
- exit(1);
+ //
+ // Find and collapse remove/create msg's into single move msg at
+ // the position of the remove msg.
+ //
+ msgqueue_t::iterator i = msgqueue[wsi].begin();
+ while(i != msgqueue[wsi].end()) {
+ message_t &msg = *i;
+ if(msg.cmd == cmd::remove) {
+
+ msgqueue_t::iterator j = msgqueue[wsi].begin();
+ while(j != msgqueue[wsi].end()) {
+ message_t m = *j;
+ if(m.cmd == cmd::create && // msg was a create msg?
+ // create with same ids as we just removed?
+ msg.remove.id == m.create.id && msg.tid == m.tid) {
+ break;
+ }
+ j++;
+ }
+
+ // collapse into move msg
+ if(j != msgqueue[wsi].end()) {
+ taskid_t id = j->create.id;
+ taskid_t parentid = j->create.id;
+ msgqueue[wsi].erase(j); // erase create msg
+ msg.cmd = cmd::move;
+ msg.move.id = id;
+ msg.move.parentid = parentid;
+ }
}
- }
- else {
- printf("\tNo queued data awaiting transmission\n");
+ i++;
}
- if(msgqueue[wsi].size()) {
- libwebsocket_rx_flow_control(wsi, 1);
- libwebsocket_callback_on_writable(context, wsi);
- }
- }
- */
- {
+
std::string msgstr;
msgstr.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
- int c = 0;
- message_t prevmsg;
- std::queue<message_t> messages;
while(msgqueue[wsi].size() > 0) {
message_t msg = msgqueue[wsi].front();
- msgqueue[wsi].pop();
-
- //Rewriting delete id followed by create id to move to_id from_id
- // look at previous message from create commands
- if(c > 0 && // look backward (skip first entry)
- msg.cmd == cmd::create && // look from create commands
- prevmsg.cmd == cmd::remove && // if previous msg is a remove command
- msg.create.id == prevmsg.remove.id) { // and commands have same id
-
- printf("\tprevmsg: %s, msg: %s\n", msg_tostring(prevmsg).c_str(),
- msg_tostring(msg).c_str());
- message_t movemsg = create_msg_move(msg.create.id, msg.create.parentid);
- movemsg.tid = msg.tid;
- msg = movemsg;
- }
- else if(c > 0) {
- messages.push(prevmsg);
- }
-
- prevmsg = msg;
- c++;
- }
- if(c > 0) { // Push last msg
- messages.push(prevmsg);
- }
-
- // create msg string
- // while(msgqueue[wsi].size() > 0) {
- while(messages.size() > 0) {
- message_t msg = messages.front();
- messages.pop();
+ msgqueue[wsi].pop_front();
char buf[32];
sprintf(buf, "%d", msg.tid);
- printf("msg: %s\n", msg_tostring(msg).c_str());
if(msgstr.size() > LWS_SEND_BUFFER_PRE_PADDING) msgstr += " ";
msgstr += std::string(buf) + " " + msg_tostring(msg);
}
msgstr.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
-
+
int n = libwebsocket_write(wsi, (unsigned char *)
- msgstr.c_str() +
- LWS_SEND_BUFFER_PRE_PADDING,
- msgstr.length() -
- LWS_SEND_BUFFER_POST_PADDING -
- LWS_SEND_BUFFER_PRE_PADDING,
- LWS_WRITE_TEXT);
+ msgstr.c_str() +
+ LWS_SEND_BUFFER_PRE_PADDING,
+ msgstr.length() -
+ LWS_SEND_BUFFER_POST_PADDING -
+ LWS_SEND_BUFFER_PRE_PADDING,
+ LWS_WRITE_TEXT);
if(n < 0) {
fprintf(stderr, "ERROR writing to socket");
exit(1);
- } }
-
+ }
+ }
+
break;
/*
@@ -242,8 +203,8 @@ int callback_lws_task(struct libwebsocket_context * context,
message_t updatemsg = create_msg_update(task);
updatemsg.tid = omi->observe.id;
- msgqueue[wsi].push(createmsg);
- msgqueue[wsi].push(updatemsg);
+ msgqueue[wsi].push_back(createmsg);
+ msgqueue[wsi].push_back(updatemsg);
id++;
}
@@ -257,10 +218,11 @@ int callback_lws_task(struct libwebsocket_context * context,
message_t removemsg = create_msg_remove(task);
removemsg.tid = omi->observe.id;
- msgqueue[wsi].push(removemsg);
+ msgqueue[wsi].push_back(removemsg);
id++;
}
+
} else {
printf("%d nodes affected by command\n", omi->nodes.size());
@@ -277,7 +239,7 @@ int callback_lws_task(struct libwebsocket_context * context,
message_t msg = *omi;
msg.tid = tid;
- msgqueue[clientid].push(msg);
+ msgqueue[clientid].push_back(msg);
ci++;
}