From 802a8b7e4896a12d8eced17b6ee54c7bca02a629 Mon Sep 17 00:00:00 2001 From: Bent Bisballe Nyeng Date: Fri, 30 Mar 2012 12:30:32 +0200 Subject: Make task_proto use new design. --- src/Makefile.am | 4 + src/connectionhandler.h | 2 +- src/messageparser.cc | 20 +++- src/task_proto.cc | 282 ++++++------------------------------------------ src/task_proto.h | 2 +- src/xmlparser.cc | 4 +- 6 files changed, 57 insertions(+), 257 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 9731814..0916e96 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -17,6 +17,8 @@ muniad_SOURCES = \ saxparser.cc \ task.cc \ task_proto.cc \ + taskmanager.cc \ + tasktree.cc \ observe_proto.cc \ xml_encode_decode.cc \ xmlparser.cc @@ -31,6 +33,8 @@ EXTRA_DIST = \ saxparser.h \ task.h \ task_proto.h \ + taskmanager.h \ + tasktree.h \ observe_proto.h \ xml_encode_decode.h \ xmlparser.h diff --git a/src/connectionhandler.h b/src/connectionhandler.h index 9aba0cb..f5e63d7 100644 --- a/src/connectionhandler.h +++ b/src/connectionhandler.h @@ -34,7 +34,7 @@ #include "task.h" -typedef void* clientid_t; +typedef struct libwebsocket* clientid_t; typedef std::map > ConnectionList; diff --git a/src/messageparser.cc b/src/messageparser.cc index cb46669..e6d5e46 100644 --- a/src/messageparser.cc +++ b/src/messageparser.cc @@ -108,7 +108,9 @@ inline static void create_msg_list(MsgTokensList& msgTokensList, message_t m; - if(t[0] == "add") m.cmd = cmd::add; + if(t[0] == "observe") m.cmd = cmd::observe; + else if(t[0] == "unobserve") m.cmd = cmd::unobserve; + else if(t[0] == "add") m.cmd = cmd::add; else if(t[0] == "del") m.cmd = cmd::del; else if(t[0] == "move") m.cmd = cmd::move; else if(t[0] == "update") m.cmd = cmd::update; @@ -117,6 +119,22 @@ inline static void create_msg_list(MsgTokensList& msgTokensList, // printf("Number of tokens %d\n", t.size()); switch(m.cmd) { + case cmd::observe: { + if(t.size() != 1+1) { + printf("Wrong number of parameters\n"); + continue; + } + m.observe.id = atoi(t[1].c_str()); + break; + } + case cmd::unobserve: { + if(t.size() != 1+1) { + printf("Wrong number of parameters\n"); + continue; + } + m.unobserve.id = atoi(t[1].c_str()); + break; + } case cmd::add: { if(t.size() != 3+1) { printf("Wrong number of parameters\n"); diff --git a/src/task_proto.cc b/src/task_proto.cc index 6a4e762..cf647d5 100644 --- a/src/task_proto.cc +++ b/src/task_proto.cc @@ -35,8 +35,10 @@ #include "task.h" #include "messageparser.h" -#include "xmlparser.h" +#include "messagehandler.h" +#include "connectionhandler.h" +#if 0 static void dump_handshake_info(struct lws_tokens *lwst) { int n; @@ -74,6 +76,7 @@ static void dump_handshake_info(struct lws_tokens *lwst) fprintf(stderr, "\n"); } } +#endif static std::map > msgqueue; @@ -82,64 +85,18 @@ int callback_lws_task(struct libwebsocket_context * context, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len) { - int n; - struct per_session_data__lws_task *pss = - (struct per_session_data__lws_task *)user; - switch (reason) { - printf("============= Reason %d\n", reason); - case LWS_CALLBACK_ESTABLISHED: - { - fprintf(stderr, "callback_lws_task: LWS_CALLBACK_ESTABLISHED\n"); - // pss->ringbuffer_tail = ringbuffer_head; - pss->wsi = wsi; - /* - - // send all current tasks - // char buf[512]; - std::string init_str; - TaskList::iterator it; - for(it = tasklist.begin(); it != tasklist.end(); it++) { - task_t &t = *it; - - printf("add %d %s %s %d;", - t.id, t.title.c_str(), t.desc.c_str(), t.parent_id); - - // init_str.append(task_cmd(t)); - init_str += msg_tostring(create_msg(cmd::add, t)); - } - - std::string msg; - msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); - msg.append(init_str); - msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); - msgqueue[wsi].push(msg); - - libwebsocket_callback_on_writable(context, wsi); - */ - } - break; - - case LWS_CALLBACK_CLOSED: - { - printf("Closing connection\n"); - msgqueue.erase(wsi); - } - break; - case LWS_CALLBACK_SERVER_WRITEABLE: { - printf("LWS_CALLBACK_SERVER_WRITEABLE\n"); - - // if(pss->ringbuffer_tail != ringbuffer_head) { if(msgqueue[wsi].size() > 0) { std::string msg = msgqueue[wsi].front(); msgqueue[wsi].pop(); - n = libwebsocket_write(wsi, (unsigned char *) - msg.c_str() + - LWS_SEND_BUFFER_PRE_PADDING, - msg.length() - LWS_SEND_BUFFER_POST_PADDING - LWS_SEND_BUFFER_PRE_PADDING, - LWS_WRITE_TEXT); + int n = libwebsocket_write(wsi, (unsigned char *) + msg.c_str() + + LWS_SEND_BUFFER_PRE_PADDING, + msg.length() - LWS_SEND_BUFFER_POST_PADDING - + LWS_SEND_BUFFER_PRE_PADDING, + LWS_WRITE_TEXT); if(n < 0) { fprintf(stderr, "ERROR writing to socket"); exit(1); @@ -153,11 +110,13 @@ int callback_lws_task(struct libwebsocket_context * context, } break; + /* case LWS_CALLBACK_BROADCAST: printf("LWS_CALLBACK_BROADCAST\n"); n = libwebsocket_write(wsi, (unsigned char*)in, len, LWS_WRITE_TEXT); if (n < 0) fprintf(stderr, "task write failed\n"); break; + */ case LWS_CALLBACK_RECEIVE: { @@ -167,211 +126,30 @@ int callback_lws_task(struct libwebsocket_context * context, std::string data; data.append((char*)in, len); - /* - struct a_message &msg = ringbuffer[ringbuffer_head]; - if(msg.payload) { - free(msg.payload); - msg.payload = NULL; - } - - */ - // char buf[1024]; - // size_t buf_len = 0; - - // print tasklist - TaskList::iterator it_taskbegin; - printf("-->["); - for(it_taskbegin = tasklist.begin(); - it_taskbegin != tasklist.end(); - it_taskbegin++) { - printf(" %d", it_taskbegin->id); - } - printf("]\n"); - - std::string buf_str; - - MsgVector msglist = parse_msg(data); - MsgVector::iterator it_msg; - for(it_msg = msglist.begin(); it_msg != msglist.end(); it_msg++) { - msg_t m = *it_msg; - switch(m.cmd) { - case cmd::add: { - printf("Handling add cmd\n"); - - // task_t t = create_task(m.add.title, m.add.desc, - // m.add.x, m.add.y); - task_t t = create_task(m.add.title, m.add.desc, - m.add.parent_id); - tasklist.insert(t); - // buf_len = sprintf(buf, "add %d %s %s %d %d;", - // t.id, t.title.c_str(), t.desc.c_str(), - // t.x, t.y); - - // msg_t tm = create_msg(cmd::add, t); - // std::string str = msg_tostring(tm); - buf_str = msg_tostring(create_msg(cmd::add, t)); - - printf("Adding task: %s\n", buf_str.c_str()); - break; - } - case cmd::del: { - printf("Delete\n"); - printf("Deleting task with id %d\n", m.del.id); - - // todo: delete all children recursively - task_t deleted_task; - // tasklist.erase(m.del.id, &deleted_task); - bool id_found = false; - TaskList::iterator it; - for(it = tasklist.begin(); it != tasklist.end(); it++) { - task_t &t = *it; - if(t.id == m.del.id) { - id_found = true; - deleted_task = t; - tasklist.erase(it); - break; - } - } - - if(!id_found) { - printf("\t!!!Could not locate task with id %d\n", m.del.id); - } - else { - // buf_len = sprintf(buf, "del %d;", m.del.id); - buf_str = msg_tostring(create_msg(cmd::del, deleted_task)); - printf("Deleting task: %s\n", buf_str.c_str()); - } - break; - } - case cmd::move: { - printf("Move\n"); - - printf("Moving task with id %d to %d\n", m.move.id, m.move.parent_id); - - bool id_found = false; - TaskList::iterator it; - - // int x = m.move.x / 300 * 300; - - task_t moved_task; - for(it = tasklist.begin(); it != tasklist.end(); it++) { - task_t &t = *it; - if(t.id == m.move.id) { - id_found = true; - // t.x = x; - // t.y = m.move.y; - t.parent_id = m.move.parent_id; - moved_task = t; - tasklist.move(t); - break; - } - } - - if(!id_found) { - printf("\t!!!Could not locate task with id %d\n", m.move.id); - } - - // buf_len = sprintf(buf, "move %d %d %d;", m.move.id, x, m.move.y); - buf_str = msg_tostring(create_msg(cmd::move, moved_task)); - printf("Moving task: %s\n", buf_str.c_str()); - break; - } - case cmd::update: { - printf("Updating %d\n", m.update.id); - - bool id_found = false; - TaskList::iterator it; - - task_t updated_task; - for(it = tasklist.begin(); it != tasklist.end(); it++) { - task_t &t = *it; - if(t.id == m.update.id) { - id_found = true; - t.title = m.update.title; - t.desc = m.update.desc; - updated_task = t; - break; - } - } - - if(!id_found) { - printf("\t!!!Could not locate task with id %d\n", m.update.id); - } - - buf_str = msg_tostring(create_msg(cmd::update, updated_task)); - printf("Updating task: %s\n", buf_str.c_str()); - - break; - } - default: - printf("Wrong command :(\n"); - break; - } - - if(!save_tasklist_to_file(tasklist, "/tmp/muniad.db")) { - printf("Could not flush db to file\n"); - } - else { - // XmlParser xml("/tmp/muniad.db"); - // xml.parse(); - // tasklist = xml.tasklist; - } - - // print tasklist - TaskList::iterator it_taskend; - printf("<--["); - for(it_taskend = tasklist.begin(); - it_taskend != tasklist.end(); - it_taskend++) { - printf(" %d", it_taskend->id); - } - printf("]\n"); - - // msg.payload = malloc(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING); - // msg.len = len; - // memcpy((char *)msg.payload + LWS_SEND_BUFFER_PRE_PADDING, in, len); - // if(ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0; - // else ringbuffer_head++; - /* - msg.payload = malloc(LWS_SEND_BUFFER_PRE_PADDING + buf_len - + LWS_SEND_BUFFER_POST_PADDING); - msg.len = buf_len; - memcpy((char *)msg.payload + LWS_SEND_BUFFER_PRE_PADDING, - buf, buf_len); - if(ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0; - else ringbuffer_head++; - */ + MessageList mlst = parse_msg(data); + MessageList omsgs = handle_msg(mlst); + MessageList::iterator omi = omsgs.begin(); + while(omi != omsgs.end()) { std::string msg; msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); - // msg.append(buf, buf_len); - msg.append(buf_str); + msg.append(msg_tostring(*omi)); msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); - - std::map >::iterator it = msgqueue.begin(); - while(it != msgqueue.end()) { - printf("!!\n"); - it->second.push(msg); - it++; + + std::set clients = + connection_handler.observerlist(omi->nodes); + + std::set::iterator ci = clients.begin(); + while(ci != clients.end()) { + msgqueue[*ci].push(msg); + ci++; } - /* - if(((ringbuffer_head - pss->ringbuffer_tail) % MAX_MESSAGE_QUEUE) > (MAX_MESSAGE_QUEUE - 10)) - libwebsocket_rx_flow_control(wsi, 0); - */ - libwebsocket_callback_on_writable_all_protocol(libwebsockets_get_protocol(wsi)); - //libwebsocket_rx_flow_control(wsi, 0); + + omi++; } - } - break; - /* - * this just demonstrates how to use the protocol filter. If you won't - * study and reject connections based on header content, you don't need - * to handle this callback - */ - case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: - printf("LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION\n"); - dump_handshake_info((struct lws_tokens *)(long)user); - /* you could return non-zero here and kill the connection */ + libwebsocket_callback_on_writable_all_protocol( + libwebsockets_get_protocol(wsi)); + } break; default: diff --git a/src/task_proto.h b/src/task_proto.h index fe57752..160dee9 100644 --- a/src/task_proto.h +++ b/src/task_proto.h @@ -32,7 +32,7 @@ #include -int callback_lws_task(struct libwebsocket_context * context, +int callback_lws_task(struct libwebsocket_context *context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len); diff --git a/src/xmlparser.cc b/src/xmlparser.cc index 7bc7e0a..97a8bdb 100644 --- a/src/xmlparser.cc +++ b/src/xmlparser.cc @@ -75,7 +75,7 @@ void XmlParser::startTag(std::string name, attributes_t &attr) { task = new task_t(); task->id = atoi(xml_decode(attr["id"]).c_str()); - task->parent_id = atoi(xml_decode(attr["parent_id"]).c_str()); + // task->parent_id = atoi(xml_decode(attr["parent_id"]).c_str()); } } @@ -83,7 +83,7 @@ void XmlParser::startTag(std::string name, attributes_t &attr) { void XmlParser::endTag(std::string name) { // printf("end: %s\n", name.c_str()); if (name == "task") { - if(task) tasklist.insert(*task); + // if(task) tasklist.push_back(*task); } else if (name == "title") { if(task) task->title = characterbuf; -- cgit v1.2.3