summaryrefslogtreecommitdiff
path: root/src/munia_proto.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/munia_proto.cc')
-rw-r--r--src/munia_proto.cc496
1 files changed, 261 insertions, 235 deletions
diff --git a/src/munia_proto.cc b/src/munia_proto.cc
index 0f03a84..6e944cf 100644
--- a/src/munia_proto.cc
+++ b/src/munia_proto.cc
@@ -46,246 +46,272 @@
#if 0
static void dump_handshake_info(struct lws_tokens *lwst)
{
- int n;
- static const char *token_names[WSI_TOKEN_COUNT] = {
- /*[WSI_TOKEN_GET_URI] =*/ "GET URI",
- /*[WSI_TOKEN_HOST] =*/ "Host",
- /*[WSI_TOKEN_CONNECTION] =*/ "Connection",
- /*[WSI_TOKEN_KEY1] =*/ "key 1",
- /*[WSI_TOKEN_KEY2] =*/ "key 2",
- /*[WSI_TOKEN_PROTOCOL] =*/ "Protocol",
- /*[WSI_TOKEN_UPGRADE] =*/ "Upgrade",
- /*[WSI_TOKEN_ORIGIN] =*/ "Origin",
- /*[WSI_TOKEN_DRAFT] =*/ "Draft",
- /*[WSI_TOKEN_CHALLENGE] =*/ "Challenge",
-
- /* new for 04 */
- /*[WSI_TOKEN_KEY] =*/ "Key",
- /*[WSI_TOKEN_VERSION] =*/ "Version",
- /*[WSI_TOKEN_SWORIGIN] =*/ "Sworigin",
-
- /* new for 05 */
- /*[WSI_TOKEN_EXTENSIONS] =*/ "Extensions",
-
- /* client receives these */
- /*[WSI_TOKEN_ACCEPT] =*/ "Accept",
- /*[WSI_TOKEN_NONCE] =*/ "Nonce",
- /*[WSI_TOKEN_HTTP] =*/ "Http",
- /*[WSI_TOKEN_MUXURL] =*/ "MuxURL",
- };
-
- for (n = 0; n < WSI_TOKEN_COUNT; n++) {
- if (lwst[n].token == NULL || lwst[n].token_len == 0) continue;
- DEBUG(proto," %s = ", token_names[n]);
- if(fwrite(lwst[n].token, 1, lwst[n].token_len, stderr)) {}
- DEBUG(proto,"\n");
- }
+ int n;
+ static const char *token_names[WSI_TOKEN_COUNT] = {
+ /*[WSI_TOKEN_GET_URI] =*/ "GET URI",
+ /*[WSI_TOKEN_HOST] =*/ "Host",
+ /*[WSI_TOKEN_CONNECTION] =*/ "Connection",
+ /*[WSI_TOKEN_KEY1] =*/ "key 1",
+ /*[WSI_TOKEN_KEY2] =*/ "key 2",
+ /*[WSI_TOKEN_PROTOCOL] =*/ "Protocol",
+ /*[WSI_TOKEN_UPGRADE] =*/ "Upgrade",
+ /*[WSI_TOKEN_ORIGIN] =*/ "Origin",
+ /*[WSI_TOKEN_DRAFT] =*/ "Draft",
+ /*[WSI_TOKEN_CHALLENGE] =*/ "Challenge",
+
+ /* new for 04 */
+ /*[WSI_TOKEN_KEY] =*/ "Key",
+ /*[WSI_TOKEN_VERSION] =*/ "Version",
+ /*[WSI_TOKEN_SWORIGIN] =*/ "Sworigin",
+
+ /* new for 05 */
+ /*[WSI_TOKEN_EXTENSIONS] =*/ "Extensions",
+
+ /* client receives these */
+ /*[WSI_TOKEN_ACCEPT] =*/ "Accept",
+ /*[WSI_TOKEN_NONCE] =*/ "Nonce",
+ /*[WSI_TOKEN_HTTP] =*/ "Http",
+ /*[WSI_TOKEN_MUXURL] =*/ "MuxURL",
+ };
+
+ for (n = 0; n < WSI_TOKEN_COUNT; n++)
+ {
+ if (lwst[n].token == NULL || lwst[n].token_len == 0)
+ {
+ continue;
+ }
+ DEBUG(proto," %s = ", token_names[n]);
+ if(fwrite(lwst[n].token, 1, lwst[n].token_len, stderr))
+ {
+ }
+ DEBUG(proto,"\n");
+ }
}
#endif
typedef std::list<message_t> msgqueue_t;
-static std::map<struct libwebsocket *, msgqueue_t > msgqueue;
-struct libwebsocket *current_client = NULL;
+static std::map<struct lws *, msgqueue_t > msgqueue;
+struct lws *current_client = NULL;
-int callback_lws_task(struct libwebsocket_context * context,
- struct libwebsocket *wsi,
- enum libwebsocket_callback_reasons reason,
+int callback_lws_task(struct lws *wsi,
+ enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
- task_manager.tree.toStdOut();
-
- DEBUG(proto, "Callback on %p\n", wsi);
-
- switch (reason) {
- case LWS_CALLBACK_ESTABLISHED:
- connection_handler.init(wsi);
- break;
-
- case LWS_CALLBACK_CLOSED:
- connection_handler.close(wsi);
- break;
-
- case LWS_CALLBACK_SERVER_WRITEABLE:
- {
- //
- // Find and collapse remove/create msg's into single move msg at
- // the position of the remove msg.
- //
- msgqueue_t::iterator i = msgqueue[wsi].begin();
- while(i != msgqueue[wsi].end()) {
- message_t &msg = *i;
- if(msg.cmd == cmd::remove) {
-
- msgqueue_t::iterator j = msgqueue[wsi].begin();
- while(j != msgqueue[wsi].end()) {
- message_t m = *j;
- if(m.cmd == cmd::create && // msg was a create msg?
- // created with same ids as we just removed?
- msg.remove.id == m.create.id &&
- msg.tid == m.tid) {
- break;
- }
- j++;
- }
-
- // collapse into move msg
- if(j != msgqueue[wsi].end()) {
- taskid_t id = j->create.id;
- taskid_t parentid = j->create.parentid;
- msgqueue[wsi].erase(j); // erase create msg
- msg.cmd = cmd::move;
- msg.move.id = id;
- msg.move.parentid = parentid;
- }
- }
- i++;
- }
-
- std::string msgstr;
- msgstr.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
-
- if(msgqueue[wsi].size() == 0 && wsi == current_client) msgstr += ";";
-
- while(msgqueue[wsi].size() > 0) {
- message_t msg = msgqueue[wsi].front();
- msgqueue[wsi].pop_front();
- char buf[32];
- sprintf(buf, "%d", msg.tid);
- if(msgstr.size() > LWS_SEND_BUFFER_PRE_PADDING) msgstr += " ";
- msgstr += std::string(buf) + " " + msg_tostring(msg);
- }
-
-
- msgstr.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
-
- int n = libwebsocket_write(wsi, (unsigned char *)
- msgstr.c_str() +
- LWS_SEND_BUFFER_PRE_PADDING,
- msgstr.length() -
- LWS_SEND_BUFFER_POST_PADDING -
- LWS_SEND_BUFFER_PRE_PADDING,
- LWS_WRITE_TEXT);
- if(n < 0) {
- DEBUG(proto,"ERROR writing to socket");
- exit(1);
- }
- }
-
- break;
-
- /*
- case LWS_CALLBACK_BROADCAST:
- printf("LWS_CALLBACK_BROADCAST\n");
- n = libwebsocket_write(wsi, (unsigned char*)in, len, LWS_WRITE_TEXT);
- if (n < 0) DEBUG(proto, "task write failed\n");
- break;
- */
-
- case LWS_CALLBACK_RECEIVE:
- {
- DEBUG(proto, "LWS_CALLBACK_RECEIVE\n");
-
- current_client = wsi;
-
- DEBUG(proto, "%s\n", (char*)in);
- std::string data;
- data.append((char*)in, len);
-
- MessageList mlst = parse_msg(data);
- DEBUG(proto, "Handling %d incoming message\n", (int)mlst.size());
- MessageList omsgs = handle_msg(mlst, wsi);
- DEBUG(proto, "Handling %d outgoing messages\n", (int)omsgs.size());
-
- MessageList::iterator omi = omsgs.begin();
- while(omi != omsgs.end()) {
- DEBUG(proto, "Message\n");
- if(omi->cmd == cmd::observe) {
- connection_handler.observe(wsi, omi->observe.id);
- TaskIdList ids;
- try {
- ids = task_manager.subTasks(omi->observe.id);
- } catch(...) {
- DEBUG(proto, "No such node %d\n", omi->observe.id);
- omi++;
- continue;
- }
- TaskIdList::iterator id = ids.begin();
- while(id != ids.end()) {
- task_t task = task_manager.task(*id);
-
- message_t createmsg = create_msg_create(task);
- createmsg.tid = omi->observe.id;
- msgqueue[wsi].push_back(createmsg);
-
- std::map<std::string, std::string>::iterator ai =
- task.attributes.begin();
- while(ai != task.attributes.end()) {
- message_t updatemsg = create_msg_update(task, ai->first);
- DEBUG(observe, "%s\n", ai->first.c_str());
- updatemsg.tid = omi->observe.id;
- msgqueue[wsi].push_back(updatemsg);
- ai++;
- }
-
- id++;
- }
-
- } else if(omi->cmd == cmd::unobserve) {
- connection_handler.unobserve(wsi, omi->unobserve.id);
- TaskIdList ids;
- try {
- ids = task_manager.subTasks(omi->unobserve.id);
- } catch(...) {
- DEBUG(proto, "No such node %d\n", omi->unobserve.id);
- omi++;
- continue;
- }
- TaskIdList::reverse_iterator id = ids.rbegin();
- while(id != ids.rend()) {
- task_t task = task_manager.task(*id);
-
- message_t removemsg = create_msg_remove(task);
- removemsg.tid = omi->unobserve.id;
- msgqueue[wsi].push_back(removemsg);
- DEBUG(unobserve, "remove id: %d\n", *id);
-
- id++;
- }
-
- } else {
- DEBUG(proto, "%d nodes affected by command\n",
- (int)omi->nodes.size());
-
- ObserverList clients = connection_handler.observerlist(omi->nodes);
- DEBUG(proto, "Writing message to %d clients\n", (int)clients.size());
-
- ObserverList::iterator ci = clients.begin();
- while(ci != clients.end()) {
- clientid_t clientid = (*ci).first;
- taskid_t tid = (*ci).second;
-
- DEBUG(proto, "Observer id of task: %d\n", tid);
-
- message_t msg = *omi;
- msg.tid = tid;
-
- msgqueue[clientid].push_back(msg);
-
- ci++;
- }
- }
-
- omi++;
- }
-
- libwebsocket_callback_on_writable_all_protocol(
- libwebsockets_get_protocol(wsi));
- }
- break;
-
- default:
- break;
- }
-
- return 0;
+ task_manager.tree.toStdOut();
+
+ DEBUG(proto, "Callback on %p\n", wsi);
+
+ switch (reason)
+ {
+ case LWS_CALLBACK_ESTABLISHED:
+ connection_handler.init(wsi);
+ break;
+
+ case LWS_CALLBACK_CLOSED:
+ connection_handler.close(wsi);
+ break;
+
+ case LWS_CALLBACK_SERVER_WRITEABLE:
+ {
+ //
+ // Find and collapse remove/create msg's into single move msg at
+ // the position of the remove msg.
+ //
+ msgqueue_t::iterator i = msgqueue[wsi].begin();
+ while(i != msgqueue[wsi].end()) {
+ message_t &msg = *i;
+ if(msg.cmd == cmd::remove)
+ {
+ msgqueue_t::iterator j = msgqueue[wsi].begin();
+ while(j != msgqueue[wsi].end())
+ {
+ message_t m = *j;
+ if(m.cmd == cmd::create && // msg was a create msg?
+ // created with same ids as we just removed?
+ msg.remove.id == m.create.id &&
+ msg.tid == m.tid)
+ {
+ break;
+ }
+ j++;
+ }
+
+ // collapse into move msg
+ if(j != msgqueue[wsi].end())
+ {
+ taskid_t id = j->create.id;
+ taskid_t parentid = j->create.parentid;
+ msgqueue[wsi].erase(j); // erase create msg
+ msg.cmd = cmd::move;
+ msg.move.id = id;
+ msg.move.parentid = parentid;
+ }
+ }
+ i++;
+ }
+
+ std::string msgstr;
+ msgstr.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
+
+ if(msgqueue[wsi].size() == 0 && wsi == current_client) msgstr += ";";
+
+ while(msgqueue[wsi].size() > 0)
+ {
+ message_t msg = msgqueue[wsi].front();
+ msgqueue[wsi].pop_front();
+ char buf[32];
+ sprintf(buf, "%d", msg.tid);
+ if(msgstr.size() > LWS_SEND_BUFFER_PRE_PADDING)
+ {
+ msgstr += " ";
+ }
+ msgstr += std::string(buf) + " " + msg_tostring(msg);
+ }
+
+ msgstr.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
+
+ int n = lws_write(wsi, (unsigned char *)
+ msgstr.c_str() +
+ LWS_SEND_BUFFER_PRE_PADDING,
+ msgstr.length() -
+ LWS_SEND_BUFFER_POST_PADDING -
+ LWS_SEND_BUFFER_PRE_PADDING,
+ LWS_WRITE_TEXT);
+ if(n < 0)
+ {
+ DEBUG(proto,"ERROR writing to socket");
+ exit(1);
+ }
+ }
+
+ break;
+
+// case LWS_CALLBACK_BROADCAST:
+// printf("LWS_CALLBACK_BROADCAST\n");
+// n = lws_write(wsi, (unsigned char*)in, len, LWS_WRITE_TEXT);
+// if (n < 0) DEBUG(proto, "task write failed\n");
+// break;
+
+ case LWS_CALLBACK_RECEIVE:
+ {
+ DEBUG(proto, "LWS_CALLBACK_RECEIVE\n");
+
+ current_client = wsi;
+
+ DEBUG(proto, "%s\n", (char*)in);
+ std::string data;
+ data.append((char*)in, len);
+
+ MessageList mlst = parse_msg(data);
+ DEBUG(proto, "Handling %d incoming message\n", (int)mlst.size());
+ MessageList omsgs = handle_msg(mlst, wsi);
+ DEBUG(proto, "Handling %d outgoing messages\n", (int)omsgs.size());
+
+ MessageList::iterator omi = omsgs.begin();
+ while(omi != omsgs.end())
+ {
+ DEBUG(proto, "Message\n");
+ if(omi->cmd == cmd::observe)
+ {
+ connection_handler.observe(wsi, omi->observe.id);
+ TaskIdList ids;
+ try
+ {
+ ids = task_manager.subTasks(omi->observe.id);
+ }
+ catch(...)
+ {
+ DEBUG(proto, "No such node %d\n", omi->observe.id);
+ omi++;
+ continue;
+ }
+ TaskIdList::iterator id = ids.begin();
+ while(id != ids.end())
+ {
+ task_t task = task_manager.task(*id);
+
+ message_t createmsg = create_msg_create(task);
+ createmsg.tid = omi->observe.id;
+ msgqueue[wsi].push_back(createmsg);
+
+ std::map<std::string, std::string>::iterator ai =
+ task.attributes.begin();
+ while(ai != task.attributes.end())
+ {
+ message_t updatemsg = create_msg_update(task, ai->first);
+ DEBUG(observe, "%s\n", ai->first.c_str());
+ updatemsg.tid = omi->observe.id;
+ msgqueue[wsi].push_back(updatemsg);
+ ai++;
+ }
+
+ id++;
+ }
+ }
+ else if(omi->cmd == cmd::unobserve)
+ {
+ connection_handler.unobserve(wsi, omi->unobserve.id);
+ TaskIdList ids;
+ try
+ {
+ ids = task_manager.subTasks(omi->unobserve.id);
+ }
+ catch(...)
+ {
+ DEBUG(proto, "No such node %d\n", omi->unobserve.id);
+ omi++;
+ continue;
+ }
+ TaskIdList::reverse_iterator id = ids.rbegin();
+ while(id != ids.rend())
+ {
+ task_t task = task_manager.task(*id);
+
+ message_t removemsg = create_msg_remove(task);
+ removemsg.tid = omi->unobserve.id;
+ msgqueue[wsi].push_back(removemsg);
+ DEBUG(unobserve, "remove id: %d\n", *id);
+
+ id++;
+ }
+
+ }
+ else
+ {
+ DEBUG(proto, "%d nodes affected by command\n",
+ (int)omi->nodes.size());
+
+ ObserverList clients = connection_handler.observerlist(omi->nodes);
+ DEBUG(proto, "Writing message to %d clients\n", (int)clients.size());
+
+ ObserverList::iterator ci = clients.begin();
+ while(ci != clients.end())
+ {
+ clientid_t clientid = (*ci).first;
+ taskid_t tid = (*ci).second;
+
+ DEBUG(proto, "Observer id of task: %d\n", tid);
+
+ message_t msg = *omi;
+ msg.tid = tid;
+
+ msgqueue[clientid].push_back(msg);
+
+ ci++;
+ }
+ }
+
+ omi++;
+ }
+
+ lws_callback_on_writable_all_protocol(lws_get_context(wsi),
+ lws_get_protocol(wsi));
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ return 0;
}