/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set et sw=2 ts=2: */ /*************************************************************************** * munia_proto.cc * * Fri Feb 24 08:23:16 CET 2012 * Copyright 2012 Bent Bisballe Nyeng * deva@aasimon.org ****************************************************************************/ /* * This file is part of Munia. * * Munia is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Munia is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Munia; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ #include "munia_proto.h" #include #include #include #include #include "task.h" #include "messageparser.h" #include "messagehandler.h" #include "connectionhandler.h" // Defines global task_manager object #include "taskmanager.h" #if 0 static void dump_handshake_info(struct lws_tokens *lwst) { int n; static const char *token_names[WSI_TOKEN_COUNT] = { /*[WSI_TOKEN_GET_URI] =*/ "GET URI", /*[WSI_TOKEN_HOST] =*/ "Host", /*[WSI_TOKEN_CONNECTION] =*/ "Connection", /*[WSI_TOKEN_KEY1] =*/ "key 1", /*[WSI_TOKEN_KEY2] =*/ "key 2", /*[WSI_TOKEN_PROTOCOL] =*/ "Protocol", /*[WSI_TOKEN_UPGRADE] =*/ "Upgrade", /*[WSI_TOKEN_ORIGIN] =*/ "Origin", /*[WSI_TOKEN_DRAFT] =*/ "Draft", /*[WSI_TOKEN_CHALLENGE] =*/ "Challenge", /* new for 04 */ /*[WSI_TOKEN_KEY] =*/ "Key", /*[WSI_TOKEN_VERSION] =*/ "Version", /*[WSI_TOKEN_SWORIGIN] =*/ "Sworigin", /* new for 05 */ /*[WSI_TOKEN_EXTENSIONS] =*/ "Extensions", /* client receives these */ /*[WSI_TOKEN_ACCEPT] =*/ "Accept", /*[WSI_TOKEN_NONCE] =*/ "Nonce", /*[WSI_TOKEN_HTTP] =*/ "Http", /*[WSI_TOKEN_MUXURL] =*/ "MuxURL", }; for (n = 0; n < WSI_TOKEN_COUNT; n++) { if (lwst[n].token == NULL || lwst[n].token_len == 0) continue; fprintf(stderr, " %s = ", token_names[n]); if(fwrite(lwst[n].token, 1, lwst[n].token_len, stderr)) {} fprintf(stderr, "\n"); } } #endif static std::map > msgqueue; int callback_lws_task(struct libwebsocket_context * context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len) { task_manager.tree.toStdOut(); printf("Callback on %p\n", wsi); switch (reason) { case LWS_CALLBACK_ESTABLISHED: connection_handler.init(wsi); break; case LWS_CALLBACK_CLOSED: connection_handler.close(wsi); break; case LWS_CALLBACK_SERVER_WRITEABLE: /* { printf("Socket for client %p writable\n", wsi); if(msgqueue[wsi].size() > 0) { message_t msg = msgqueue[wsi].front(); msgqueue[wsi].pop(); std::string msgcmd = msg_tostring(msg); std::string msg; clientid_t clientid = (*ci).first; char tidstr[32]; sprintf(tidstr, "%u", (*ci).second); printf("\tAdding data to %p's queue\n", clientid); msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); msg.append(tidstr); msg.append(" "); msg.append(msgcmd); msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); int n = libwebsocket_write(wsi, (unsigned char *) msg.c_str() + LWS_SEND_BUFFER_PRE_PADDING, msg.length() - LWS_SEND_BUFFER_POST_PADDING - LWS_SEND_BUFFER_PRE_PADDING, LWS_WRITE_TEXT); if(n < 0) { fprintf(stderr, "ERROR writing to socket"); exit(1); } } else { printf("\tNo queued data awaiting transmission\n"); } if(msgqueue[wsi].size()) { libwebsocket_rx_flow_control(wsi, 1); libwebsocket_callback_on_writable(context, wsi); } } */ { std::string msgstr; msgstr.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); int c = 0; message_t prevmsg; std::queue messages; while(msgqueue[wsi].size() > 0) { message_t msg = msgqueue[wsi].front(); msgqueue[wsi].pop(); //Rewriting delete id followed by create id to move to_id from_id // look at previous message from create commands if(c > 0 && // look backward (skip first entry) msg.cmd == cmd::create && // look from create commands prevmsg.cmd == cmd::remove && // if previous msg is a remove command msg.create.id == prevmsg.remove.id) { // and commands have same id printf("\tprevmsg: %s, msg: %s\n", msg_tostring(prevmsg).c_str(), msg_tostring(msg).c_str()); message_t movemsg = create_msg_move(msg.create.id, msg.create.parentid); msg = movemsg; } else if(c > 0) { messages.push(prevmsg); } prevmsg = msg; c++; } if(c > 0) { // Push last msg messages.push(prevmsg); } // create msg string // while(msgqueue[wsi].size() > 0) { while(messages.size() > 0) { message_t msg = messages.front(); messages.pop(); char buf[32]; sprintf(buf, "%d", msg.tid); printf("msg: %s\n", msg_tostring(msg).c_str()); if(msgstr.size() > LWS_SEND_BUFFER_PRE_PADDING) msgstr += " "; msgstr += std::string(buf) + " " + msg_tostring(msg); } msgstr.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); int n = libwebsocket_write(wsi, (unsigned char *) msgstr.c_str() + LWS_SEND_BUFFER_PRE_PADDING, msgstr.length() - LWS_SEND_BUFFER_POST_PADDING - LWS_SEND_BUFFER_PRE_PADDING, LWS_WRITE_TEXT); if(n < 0) { fprintf(stderr, "ERROR writing to socket"); exit(1); } } break; /* case LWS_CALLBACK_BROADCAST: printf("LWS_CALLBACK_BROADCAST\n"); n = libwebsocket_write(wsi, (unsigned char*)in, len, LWS_WRITE_TEXT); if (n < 0) fprintf(stderr, "task write failed\n"); break; */ case LWS_CALLBACK_RECEIVE: { printf("LWS_CALLBACK_RECEIVE\n"); printf("%s\n", (char*)in); std::string data; data.append((char*)in, len); MessageList mlst = parse_msg(data); printf("Handling %d incoming message\n", mlst.size()); MessageList omsgs = handle_msg(mlst, wsi); printf("Handling %d outgoing messages\n", omsgs.size()); MessageList::iterator omi = omsgs.begin(); while(omi != omsgs.end()) { if(omi->cmd == cmd::observe) { TaskIdList ids = task_manager.subTasks(omi->observe.id); TaskIdList::iterator id = ids.begin(); while(id != ids.end()) { task_t task = task_manager.task(*id); message_t createmsg = create_msg_create(task); createmsg.tid = omi->observe.id; message_t updatemsg = create_msg_update(task); updatemsg.tid = omi->observe.id; msgqueue[wsi].push(createmsg); msgqueue[wsi].push(updatemsg); id++; } } else if(omi->cmd == cmd::unobserve) { TaskIdList ids = task_manager.subTasks(omi->observe.id); TaskIdList::reverse_iterator id = ids.rbegin(); while(id != ids.rend()) { task_t task = task_manager.task(*id); message_t removemsg = create_msg_remove(task); removemsg.tid = omi->observe.id; msgqueue[wsi].push(removemsg); id++; } } else { printf("%d nodes affected by command\n", omi->nodes.size()); ObserverList clients = connection_handler.observerlist(omi->nodes); printf("Writing message to %d clients\n", clients.size()); ObserverList::iterator ci = clients.begin(); while(ci != clients.end()) { clientid_t clientid = (*ci).first; taskid_t tid = (*ci).second; printf("Observer id of task: %d\n", tid); message_t msg = *omi; msg.tid = tid; msgqueue[clientid].push(msg); ci++; } } omi++; } libwebsocket_callback_on_writable_all_protocol( libwebsockets_get_protocol(wsi)); } break; default: break; } return 0; }