/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set et sw=2 ts=2: */ /*************************************************************************** * munia_proto.cc * * Fri Feb 24 08:23:16 CET 2012 * Copyright 2012 Bent Bisballe Nyeng * deva@aasimon.org ****************************************************************************/ /* * This file is part of Munia. * * Munia is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Munia is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Munia; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ #include "munia_proto.h" #include #include #include #include #include "hugin.hpp" #include "node.h" #include "messageparser.h" #include "messagehandler.h" #include "connectionhandler.h" // Defines global node_manager object #include "nodemanager.h" #if 0 static void dump_handshake_info(struct lws_tokens *lwst) { int n; static const char *token_names[WSI_TOKEN_COUNT] = { /*[WSI_TOKEN_GET_URI] =*/ "GET URI", /*[WSI_TOKEN_HOST] =*/ "Host", /*[WSI_TOKEN_CONNECTION] =*/ "Connection", /*[WSI_TOKEN_KEY1] =*/ "key 1", /*[WSI_TOKEN_KEY2] =*/ "key 2", /*[WSI_TOKEN_PROTOCOL] =*/ "Protocol", /*[WSI_TOKEN_UPGRADE] =*/ "Upgrade", /*[WSI_TOKEN_ORIGIN] =*/ "Origin", /*[WSI_TOKEN_DRAFT] =*/ "Draft", /*[WSI_TOKEN_CHALLENGE] =*/ "Challenge", /* new for 04 */ /*[WSI_TOKEN_KEY] =*/ "Key", /*[WSI_TOKEN_VERSION] =*/ "Version", /*[WSI_TOKEN_SWORIGIN] =*/ "Sworigin", /* new for 05 */ /*[WSI_TOKEN_EXTENSIONS] =*/ "Extensions", /* client receives these */ /*[WSI_TOKEN_ACCEPT] =*/ "Accept", /*[WSI_TOKEN_NONCE] =*/ "Nonce", /*[WSI_TOKEN_HTTP] =*/ "Http", /*[WSI_TOKEN_MUXURL] =*/ "MuxURL", }; for (n = 0; n < WSI_TOKEN_COUNT; n++) { if (lwst[n].token == NULL || lwst[n].token_len == 0) { continue; } DEBUG(proto," %s = ", token_names[n]); if(fwrite(lwst[n].token, 1, lwst[n].token_len, stderr)) { } DEBUG(proto,"\n"); } } #endif typedef std::list msgqueue_t; static std::map msgqueue; struct lws *current_client = NULL; int callback_lws_node(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { node_manager.tree.toStdOut(); DEBUG(proto, "Callback on %p\n", wsi); switch (reason) { case LWS_CALLBACK_ESTABLISHED: connection_handler.init(wsi); break; case LWS_CALLBACK_CLOSED: connection_handler.close(wsi); break; case LWS_CALLBACK_SERVER_WRITEABLE: { // // Find and collapse remove/create msg's into single move msg at // the position of the remove msg. // msgqueue_t::iterator i = msgqueue[wsi].begin(); while(i != msgqueue[wsi].end()) { message_t &msg = *i; if(msg.cmd == cmd::remove) { msgqueue_t::iterator j = msgqueue[wsi].begin(); while(j != msgqueue[wsi].end()) { message_t m = *j; if(m.cmd == cmd::create && // msg was a create msg? // created with same ids as we just removed? msg.remove.id == m.create.id && msg.tid == m.tid) { break; } j++; } // collapse into move msg if(j != msgqueue[wsi].end()) { nodeid_t id = j->create.id; nodeid_t parentid = j->create.parentid; msgqueue[wsi].erase(j); // erase create msg msg.cmd = cmd::move; msg.move.id = id; msg.move.parentid = parentid; } } i++; } std::string msgstr; msgstr.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); if(msgqueue[wsi].size() == 0 && wsi == current_client) msgstr += ";"; while(msgqueue[wsi].size() > 0) { message_t msg = msgqueue[wsi].front(); msgqueue[wsi].pop_front(); char buf[32]; sprintf(buf, "%d", msg.tid); if(msgstr.size() > LWS_SEND_BUFFER_PRE_PADDING) { msgstr += " "; } msgstr += std::string(buf) + " " + msg_tostring(msg); } msgstr.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); int n = lws_write(wsi, (unsigned char *) msgstr.c_str() + LWS_SEND_BUFFER_PRE_PADDING, msgstr.length() - LWS_SEND_BUFFER_POST_PADDING - LWS_SEND_BUFFER_PRE_PADDING, LWS_WRITE_TEXT); if(n < 0) { DEBUG(proto,"ERROR writing to socket"); exit(1); } } break; // case LWS_CALLBACK_BROADCAST: // printf("LWS_CALLBACK_BROADCAST\n"); // n = lws_write(wsi, (unsigned char*)in, len, LWS_WRITE_TEXT); // if (n < 0) DEBUG(proto, "node write failed\n"); // break; case LWS_CALLBACK_RECEIVE: { DEBUG(proto, "LWS_CALLBACK_RECEIVE\n"); current_client = wsi; DEBUG(proto, "%s\n", (char*)in); std::string data; data.append((char*)in, len); MessageList mlst = parse_msg(data); DEBUG(proto, "Handling %d incoming message\n", (int)mlst.size()); MessageList omsgs = handle_msg(mlst, wsi); DEBUG(proto, "Handling %d outgoing messages\n", (int)omsgs.size()); MessageList::iterator omi = omsgs.begin(); while(omi != omsgs.end()) { DEBUG(proto, "Message\n"); if(omi->cmd == cmd::subscribe) { connection_handler.subscribe(wsi, omi->subscribe.id); NodeIdList ids; try { ids = node_manager.subNodes(omi->subscribe.id); } catch(...) { DEBUG(proto, "No such node %d\n", omi->subscribe.id); omi++; continue; } NodeIdList::iterator id = ids.begin(); while(id != ids.end()) { node_t node = node_manager.node(*id); message_t createmsg = create_msg_create(node); createmsg.tid = omi->subscribe.id; msgqueue[wsi].push_back(createmsg); std::map::iterator ai = node.attributes.begin(); while(ai != node.attributes.end()) { message_t updatemsg = create_msg_update(node, ai->first); DEBUG(subscribe, "%s\n", ai->first.c_str()); updatemsg.tid = omi->subscribe.id; msgqueue[wsi].push_back(updatemsg); ai++; } id++; } } else if(omi->cmd == cmd::unsubscribe) { connection_handler.unsubscribe(wsi, omi->unsubscribe.id); NodeIdList ids; try { ids = node_manager.subNodes(omi->unsubscribe.id); } catch(...) { DEBUG(proto, "No such node %d\n", omi->unsubscribe.id); omi++; continue; } NodeIdList::reverse_iterator id = ids.rbegin(); while(id != ids.rend()) { node_t node = node_manager.node(*id); message_t removemsg = create_msg_remove(node); removemsg.tid = omi->unsubscribe.id; msgqueue[wsi].push_back(removemsg); DEBUG(unsubscribe, "remove id: %d\n", *id); id++; } } else { DEBUG(proto, "%d nodes affected by command\n", (int)omi->nodes.size()); SubscriberList clients = connection_handler.subscriberlist(omi->nodes); DEBUG(proto, "Writing message to %d clients\n", (int)clients.size()); SubscriberList::iterator ci = clients.begin(); while(ci != clients.end()) { clientid_t clientid = (*ci).first; nodeid_t tid = (*ci).second; DEBUG(proto, "Subscriber id of node: %d\n", tid); message_t msg = *omi; msg.tid = tid; msgqueue[clientid].push_back(msg); ci++; } } omi++; } lws_callback_on_writable_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); } break; default: break; } return 0; }