/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set et sw=2 ts=2: */ /*************************************************************************** * munia_proto.cc * * Fri Feb 24 08:23:16 CET 2012 * Copyright 2012 Bent Bisballe Nyeng * deva@aasimon.org ****************************************************************************/ /* * This file is part of Munia. * * Munia is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Munia is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Munia; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ #include "munia_proto.h" #include #include #include #include #include "debug.h" #include "task.h" #include "messageparser.h" #include "messagehandler.h" #include "connectionhandler.h" // Defines global task_manager object #include "taskmanager.h" #if 0 static void dump_handshake_info(struct lws_tokens *lwst) { int n; static const char *token_names[WSI_TOKEN_COUNT] = { /*[WSI_TOKEN_GET_URI] =*/ "GET URI", /*[WSI_TOKEN_HOST] =*/ "Host", /*[WSI_TOKEN_CONNECTION] =*/ "Connection", /*[WSI_TOKEN_KEY1] =*/ "key 1", /*[WSI_TOKEN_KEY2] =*/ "key 2", /*[WSI_TOKEN_PROTOCOL] =*/ "Protocol", /*[WSI_TOKEN_UPGRADE] =*/ "Upgrade", /*[WSI_TOKEN_ORIGIN] =*/ "Origin", /*[WSI_TOKEN_DRAFT] =*/ "Draft", /*[WSI_TOKEN_CHALLENGE] =*/ "Challenge", /* new for 04 */ /*[WSI_TOKEN_KEY] =*/ "Key", /*[WSI_TOKEN_VERSION] =*/ "Version", /*[WSI_TOKEN_SWORIGIN] =*/ "Sworigin", /* new for 05 */ /*[WSI_TOKEN_EXTENSIONS] =*/ "Extensions", /* client receives these */ /*[WSI_TOKEN_ACCEPT] =*/ "Accept", /*[WSI_TOKEN_NONCE] =*/ "Nonce", /*[WSI_TOKEN_HTTP] =*/ "Http", /*[WSI_TOKEN_MUXURL] =*/ "MuxURL", }; for (n = 0; n < WSI_TOKEN_COUNT; n++) { if (lwst[n].token == NULL || lwst[n].token_len == 0) continue; fprintf(stderr, " %s = ", token_names[n]); if(fwrite(lwst[n].token, 1, lwst[n].token_len, stderr)) {} fprintf(stderr, "\n"); } } #endif typedef std::list msgqueue_t; static std::map msgqueue; struct libwebsocket *current_client = NULL; int callback_lws_task(struct libwebsocket_context * context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len) { task_manager.tree.toStdOut(); printf("Callback on %p\n", wsi); switch (reason) { case LWS_CALLBACK_ESTABLISHED: connection_handler.init(wsi); break; case LWS_CALLBACK_CLOSED: connection_handler.close(wsi); break; case LWS_CALLBACK_SERVER_WRITEABLE: { // // Find and collapse remove/create msg's into single move msg at // the position of the remove msg. // msgqueue_t::iterator i = msgqueue[wsi].begin(); while(i != msgqueue[wsi].end()) { message_t &msg = *i; if(msg.cmd == cmd::remove) { msgqueue_t::iterator j = msgqueue[wsi].begin(); while(j != msgqueue[wsi].end()) { message_t m = *j; if(m.cmd == cmd::create && // msg was a create msg? // created with same ids as we just removed? msg.remove.id == m.create.id && msg.tid == m.tid) { break; } j++; } // collapse into move msg if(j != msgqueue[wsi].end()) { taskid_t id = j->create.id; taskid_t parentid = j->create.parentid; msgqueue[wsi].erase(j); // erase create msg msg.cmd = cmd::move; msg.move.id = id; msg.move.parentid = parentid; } } i++; } std::string msgstr; msgstr.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); if(msgqueue[wsi].size() == 0 && wsi == current_client) msgstr += ";"; while(msgqueue[wsi].size() > 0) { message_t msg = msgqueue[wsi].front(); msgqueue[wsi].pop_front(); char buf[32]; sprintf(buf, "%d", msg.tid); if(msgstr.size() > LWS_SEND_BUFFER_PRE_PADDING) msgstr += " "; msgstr += std::string(buf) + " " + msg_tostring(msg); } msgstr.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); int n = libwebsocket_write(wsi, (unsigned char *) msgstr.c_str() + LWS_SEND_BUFFER_PRE_PADDING, msgstr.length() - LWS_SEND_BUFFER_POST_PADDING - LWS_SEND_BUFFER_PRE_PADDING, LWS_WRITE_TEXT); if(n < 0) { fprintf(stderr, "ERROR writing to socket"); exit(1); } } break; /* case LWS_CALLBACK_BROADCAST: printf("LWS_CALLBACK_BROADCAST\n"); n = libwebsocket_write(wsi, (unsigned char*)in, len, LWS_WRITE_TEXT); if (n < 0) fprintf(stderr, "task write failed\n"); break; */ case LWS_CALLBACK_RECEIVE: { DEBUG(proto, "LWS_CALLBACK_RECEIVE\n"); current_client = wsi; printf("%s\n", (char*)in); std::string data; data.append((char*)in, len); MessageList mlst = parse_msg(data); DEBUG(proto, "Handling %d incoming message\n", mlst.size()); MessageList omsgs = handle_msg(mlst, wsi); DEBUG(proto, "Handling %d outgoing messages\n", omsgs.size()); MessageList::iterator omi = omsgs.begin(); while(omi != omsgs.end()) { DEBUG(proto, "Message\n"); if(omi->cmd == cmd::observe) { connection_handler.observe(wsi, omi->observe.id); TaskIdList ids; try { ids = task_manager.subTasks(omi->observe.id); } catch(...) { DEBUG(proto, "No such node %d\n", omi->observe.id); omi++; continue; } TaskIdList::iterator id = ids.begin(); while(id != ids.end()) { task_t task = task_manager.task(*id); message_t createmsg = create_msg_create(task); createmsg.tid = omi->observe.id; message_t updatemsg = create_msg_update(task); updatemsg.tid = omi->observe.id; msgqueue[wsi].push_back(createmsg); msgqueue[wsi].push_back(updatemsg); id++; } } else if(omi->cmd == cmd::unobserve) { connection_handler.unobserve(wsi, omi->observe.id); TaskIdList ids; try { ids = task_manager.subTasks(omi->observe.id); } catch(...) { DEBUG(proto, "No such node %d\n", omi->observe.id); omi++; continue; } TaskIdList::reverse_iterator id = ids.rbegin(); while(id != ids.rend()) { task_t task = task_manager.task(*id); message_t removemsg = create_msg_remove(task); removemsg.tid = omi->observe.id; msgqueue[wsi].push_back(removemsg); id++; } } else { DEBUG(proto, "%d nodes affected by command\n", omi->nodes.size()); ObserverList clients = connection_handler.observerlist(omi->nodes); DEBUG(proto, "Writing message to %d clients\n", clients.size()); ObserverList::iterator ci = clients.begin(); while(ci != clients.end()) { clientid_t clientid = (*ci).first; taskid_t tid = (*ci).second; DEBUG(proto, "Observer id of task: %d\n", tid); message_t msg = *omi; msg.tid = tid; msgqueue[clientid].push_back(msg); ci++; } } omi++; } libwebsocket_callback_on_writable_all_protocol( libwebsockets_get_protocol(wsi)); } break; default: break; } return 0; }