summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBent Bisballe Nyeng <deva@aasimon.org>2012-05-04 11:52:36 +0200
committerBent Bisballe Nyeng <deva@aasimon.org>2012-05-04 11:52:36 +0200
commitba2094c0c735efc2677ab8c24bdf70ab70874a30 (patch)
tree1703ee75e71f0fb5d34fdc7cc9d21cec174d4585
parent5523263c6ca0782979394ba25c1679801bf5fe99 (diff)
Make observe/unobserve work. Store message_t instead of std::string in message queue for client write callback.
-rw-r--r--src/connectionhandler.cc2
-rw-r--r--src/munia_proto.cc116
2 files changed, 92 insertions, 26 deletions
diff --git a/src/connectionhandler.cc b/src/connectionhandler.cc
index c8881a3..da94cb8 100644
--- a/src/connectionhandler.cc
+++ b/src/connectionhandler.cc
@@ -67,7 +67,7 @@ void ConnectionHandler::unobserve(clientid_t clientid, taskid_t taskid)
ObserverList ConnectionHandler::observerlist(TaskIdList tasks)
{
- printf("Observerlist request\n");
+ printf("Observerlist request (#tasks: %d)\n", tasks.size());
ObserverList clients;
for(TaskIdList::iterator i = tasks.begin(); i != tasks.end(); i++) {
diff --git a/src/munia_proto.cc b/src/munia_proto.cc
index a91c204..b7123aa 100644
--- a/src/munia_proto.cc
+++ b/src/munia_proto.cc
@@ -81,7 +81,7 @@ static void dump_handshake_info(struct lws_tokens *lwst)
}
#endif
-static std::map<struct libwebsocket *, std::queue<std::string> > msgqueue;
+static std::map<struct libwebsocket *, std::queue<message_t> > msgqueue;
int callback_lws_task(struct libwebsocket_context * context,
struct libwebsocket *wsi,
@@ -102,11 +102,26 @@ int callback_lws_task(struct libwebsocket_context * context,
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
+ /*
{
printf("Socket for client %p writable\n", wsi);
if(msgqueue[wsi].size() > 0) {
- std::string msg = msgqueue[wsi].front();
+ message_t msg = msgqueue[wsi].front();
msgqueue[wsi].pop();
+
+ std::string msgcmd = msg_tostring(msg);
+ std::string msg;
+ clientid_t clientid = (*ci).first;
+ char tidstr[32];
+ sprintf(tidstr, "%u", (*ci).second);
+ printf("\tAdding data to %p's queue\n", clientid);
+
+ msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
+ msg.append(tidstr);
+ msg.append(" ");
+ msg.append(msgcmd);
+ msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
+
int n = libwebsocket_write(wsi, (unsigned char *)
msg.c_str() +
LWS_SEND_BUFFER_PRE_PADDING,
@@ -127,6 +142,34 @@ int callback_lws_task(struct libwebsocket_context * context,
libwebsocket_callback_on_writable(context, wsi);
}
}
+ */
+ {
+ std::string msgstr;
+ msgstr.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
+
+ while(msgqueue[wsi].size() > 0) {
+ message_t msg = msgqueue[wsi].front();
+ msgqueue[wsi].pop();
+ char buf[32];
+ sprintf(buf, "%d", msg.tid);
+ msgstr += std::string(buf) + " " + msg_tostring(msg);
+ }
+
+ msgstr.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
+
+ int n = libwebsocket_write(wsi, (unsigned char *)
+ msgstr.c_str() +
+ LWS_SEND_BUFFER_PRE_PADDING,
+ msgstr.length() -
+ LWS_SEND_BUFFER_POST_PADDING -
+ LWS_SEND_BUFFER_PRE_PADDING,
+ LWS_WRITE_TEXT);
+ if(n < 0) {
+ fprintf(stderr, "ERROR writing to socket");
+ exit(1);
+ }
+ }
+
break;
/*
@@ -152,30 +195,53 @@ int callback_lws_task(struct libwebsocket_context * context,
MessageList::iterator omi = omsgs.begin();
while(omi != omsgs.end()) {
- std::string msgcmd = msg_tostring(*omi);
-
- printf("%d nodes affected by command\n", omi->nodes.size());
-
- std::list<std::pair<clientid_t, taskid_t> > clients =
- connection_handler.observerlist(omi->nodes);
- printf("Writing message to %d clients\n", clients.size());
-
- std::list<std::pair<clientid_t, taskid_t> >::iterator ci = clients.begin();
- while(ci != clients.end()) {
- std::string msg;
- clientid_t clientid = (*ci).first;
- char tidstr[32];
- sprintf(tidstr, "%u", (*ci).second);
- printf("\tAdding data to %p's queue\n", clientid);
-
- msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
- msg.append(tidstr);
- msg.append(" ");
- msg.append(msgcmd);
- msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
+
+ if(omi->cmd == cmd::observe) {
+
+ TaskIdList ids = task_manager.subTasks(omi->observe.id);
+ TaskIdList::iterator id = ids.begin();
+ while(id != ids.end()) {
+ task_t task = task_manager.task(*id);
+
+ message_t createmsg = create_msg_create(task);
+ message_t updatemsg = create_msg_update(task);
+ msgqueue[wsi].push(createmsg);
+ msgqueue[wsi].push(updatemsg);
+
+ id++;
+ }
+
+ } else if(omi->cmd == cmd::unobserve) {
+
+ TaskIdList ids = task_manager.subTasks(omi->observe.id);
+ TaskIdList::iterator id = ids.begin();
+ while(id != ids.end()) {
+ task_t task = task_manager.task(*id);
- msgqueue[clientid].push(msg);
- ci++;
+ message_t removemsg = create_msg_remove(task);
+ msgqueue[wsi].push(removemsg);
+
+ id++;
+ }
+
+ } else {
+ printf("%d nodes affected by command\n", omi->nodes.size());
+
+ ObserverList clients = connection_handler.observerlist(omi->nodes);
+ printf("Writing message to %d clients\n", clients.size());
+
+ ObserverList::iterator ci = clients.begin();
+ while(ci != clients.end()) {
+ clientid_t clientid = (*ci).first;
+ taskid_t tid = (*ci).second;
+
+ message_t msg = *omi;
+ msg.tid = tid;
+
+ msgqueue[clientid].push(msg);
+
+ ci++;
+ }
}
omi++;