summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am4
-rw-r--r--src/connectionhandler.h2
-rw-r--r--src/messageparser.cc20
-rw-r--r--src/task_proto.cc282
-rw-r--r--src/task_proto.h2
-rw-r--r--src/xmlparser.cc4
6 files changed, 57 insertions, 257 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 9731814..0916e96 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -17,6 +17,8 @@ muniad_SOURCES = \
saxparser.cc \
task.cc \
task_proto.cc \
+ taskmanager.cc \
+ tasktree.cc \
observe_proto.cc \
xml_encode_decode.cc \
xmlparser.cc
@@ -31,6 +33,8 @@ EXTRA_DIST = \
saxparser.h \
task.h \
task_proto.h \
+ taskmanager.h \
+ tasktree.h \
observe_proto.h \
xml_encode_decode.h \
xmlparser.h
diff --git a/src/connectionhandler.h b/src/connectionhandler.h
index 9aba0cb..f5e63d7 100644
--- a/src/connectionhandler.h
+++ b/src/connectionhandler.h
@@ -34,7 +34,7 @@
#include "task.h"
-typedef void* clientid_t;
+typedef struct libwebsocket* clientid_t;
typedef std::map<clientid_t, std::set<taskid_t> > ConnectionList;
diff --git a/src/messageparser.cc b/src/messageparser.cc
index cb46669..e6d5e46 100644
--- a/src/messageparser.cc
+++ b/src/messageparser.cc
@@ -108,7 +108,9 @@ inline static void create_msg_list(MsgTokensList& msgTokensList,
message_t m;
- if(t[0] == "add") m.cmd = cmd::add;
+ if(t[0] == "observe") m.cmd = cmd::observe;
+ else if(t[0] == "unobserve") m.cmd = cmd::unobserve;
+ else if(t[0] == "add") m.cmd = cmd::add;
else if(t[0] == "del") m.cmd = cmd::del;
else if(t[0] == "move") m.cmd = cmd::move;
else if(t[0] == "update") m.cmd = cmd::update;
@@ -117,6 +119,22 @@ inline static void create_msg_list(MsgTokensList& msgTokensList,
// printf("Number of tokens %d\n", t.size());
switch(m.cmd) {
+ case cmd::observe: {
+ if(t.size() != 1+1) {
+ printf("Wrong number of parameters\n");
+ continue;
+ }
+ m.observe.id = atoi(t[1].c_str());
+ break;
+ }
+ case cmd::unobserve: {
+ if(t.size() != 1+1) {
+ printf("Wrong number of parameters\n");
+ continue;
+ }
+ m.unobserve.id = atoi(t[1].c_str());
+ break;
+ }
case cmd::add: {
if(t.size() != 3+1) {
printf("Wrong number of parameters\n");
diff --git a/src/task_proto.cc b/src/task_proto.cc
index 6a4e762..cf647d5 100644
--- a/src/task_proto.cc
+++ b/src/task_proto.cc
@@ -35,8 +35,10 @@
#include "task.h"
#include "messageparser.h"
-#include "xmlparser.h"
+#include "messagehandler.h"
+#include "connectionhandler.h"
+#if 0
static void dump_handshake_info(struct lws_tokens *lwst)
{
int n;
@@ -74,6 +76,7 @@ static void dump_handshake_info(struct lws_tokens *lwst)
fprintf(stderr, "\n");
}
}
+#endif
static std::map<struct libwebsocket *, std::queue<std::string> > msgqueue;
@@ -82,64 +85,18 @@ int callback_lws_task(struct libwebsocket_context * context,
enum libwebsocket_callback_reasons reason,
void *user, void *in, size_t len)
{
- int n;
- struct per_session_data__lws_task *pss =
- (struct per_session_data__lws_task *)user;
-
switch (reason) {
- printf("============= Reason %d\n", reason);
- case LWS_CALLBACK_ESTABLISHED:
- {
- fprintf(stderr, "callback_lws_task: LWS_CALLBACK_ESTABLISHED\n");
- // pss->ringbuffer_tail = ringbuffer_head;
- pss->wsi = wsi;
- /*
-
- // send all current tasks
- // char buf[512];
- std::string init_str;
- TaskList::iterator it;
- for(it = tasklist.begin(); it != tasklist.end(); it++) {
- task_t &t = *it;
-
- printf("add %d %s %s %d;",
- t.id, t.title.c_str(), t.desc.c_str(), t.parent_id);
-
- // init_str.append(task_cmd(t));
- init_str += msg_tostring(create_msg(cmd::add, t));
- }
-
- std::string msg;
- msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
- msg.append(init_str);
- msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
- msgqueue[wsi].push(msg);
-
- libwebsocket_callback_on_writable(context, wsi);
- */
- }
- break;
-
- case LWS_CALLBACK_CLOSED:
- {
- printf("Closing connection\n");
- msgqueue.erase(wsi);
- }
- break;
-
case LWS_CALLBACK_SERVER_WRITEABLE:
{
- printf("LWS_CALLBACK_SERVER_WRITEABLE\n");
-
- // if(pss->ringbuffer_tail != ringbuffer_head) {
if(msgqueue[wsi].size() > 0) {
std::string msg = msgqueue[wsi].front();
msgqueue[wsi].pop();
- n = libwebsocket_write(wsi, (unsigned char *)
- msg.c_str() +
- LWS_SEND_BUFFER_PRE_PADDING,
- msg.length() - LWS_SEND_BUFFER_POST_PADDING - LWS_SEND_BUFFER_PRE_PADDING,
- LWS_WRITE_TEXT);
+ int n = libwebsocket_write(wsi, (unsigned char *)
+ msg.c_str() +
+ LWS_SEND_BUFFER_PRE_PADDING,
+ msg.length() - LWS_SEND_BUFFER_POST_PADDING -
+ LWS_SEND_BUFFER_PRE_PADDING,
+ LWS_WRITE_TEXT);
if(n < 0) {
fprintf(stderr, "ERROR writing to socket");
exit(1);
@@ -153,11 +110,13 @@ int callback_lws_task(struct libwebsocket_context * context,
}
break;
+ /*
case LWS_CALLBACK_BROADCAST:
printf("LWS_CALLBACK_BROADCAST\n");
n = libwebsocket_write(wsi, (unsigned char*)in, len, LWS_WRITE_TEXT);
if (n < 0) fprintf(stderr, "task write failed\n");
break;
+ */
case LWS_CALLBACK_RECEIVE:
{
@@ -167,211 +126,30 @@ int callback_lws_task(struct libwebsocket_context * context,
std::string data;
data.append((char*)in, len);
- /*
- struct a_message &msg = ringbuffer[ringbuffer_head];
- if(msg.payload) {
- free(msg.payload);
- msg.payload = NULL;
- }
-
- */
- // char buf[1024];
- // size_t buf_len = 0;
-
- // print tasklist
- TaskList::iterator it_taskbegin;
- printf("-->[");
- for(it_taskbegin = tasklist.begin();
- it_taskbegin != tasklist.end();
- it_taskbegin++) {
- printf(" %d", it_taskbegin->id);
- }
- printf("]\n");
-
- std::string buf_str;
-
- MsgVector msglist = parse_msg(data);
- MsgVector::iterator it_msg;
- for(it_msg = msglist.begin(); it_msg != msglist.end(); it_msg++) {
- msg_t m = *it_msg;
- switch(m.cmd) {
- case cmd::add: {
- printf("Handling add cmd\n");
-
- // task_t t = create_task(m.add.title, m.add.desc,
- // m.add.x, m.add.y);
- task_t t = create_task(m.add.title, m.add.desc,
- m.add.parent_id);
- tasklist.insert(t);
- // buf_len = sprintf(buf, "add %d %s %s %d %d;",
- // t.id, t.title.c_str(), t.desc.c_str(),
- // t.x, t.y);
-
- // msg_t tm = create_msg(cmd::add, t);
- // std::string str = msg_tostring(tm);
- buf_str = msg_tostring(create_msg(cmd::add, t));
-
- printf("Adding task: %s\n", buf_str.c_str());
- break;
- }
- case cmd::del: {
- printf("Delete\n");
- printf("Deleting task with id %d\n", m.del.id);
-
- // todo: delete all children recursively
- task_t deleted_task;
- // tasklist.erase(m.del.id, &deleted_task);
- bool id_found = false;
- TaskList::iterator it;
- for(it = tasklist.begin(); it != tasklist.end(); it++) {
- task_t &t = *it;
- if(t.id == m.del.id) {
- id_found = true;
- deleted_task = t;
- tasklist.erase(it);
- break;
- }
- }
-
- if(!id_found) {
- printf("\t!!!Could not locate task with id %d\n", m.del.id);
- }
- else {
- // buf_len = sprintf(buf, "del %d;", m.del.id);
- buf_str = msg_tostring(create_msg(cmd::del, deleted_task));
- printf("Deleting task: %s\n", buf_str.c_str());
- }
- break;
- }
- case cmd::move: {
- printf("Move\n");
-
- printf("Moving task with id %d to %d\n", m.move.id, m.move.parent_id);
-
- bool id_found = false;
- TaskList::iterator it;
-
- // int x = m.move.x / 300 * 300;
-
- task_t moved_task;
- for(it = tasklist.begin(); it != tasklist.end(); it++) {
- task_t &t = *it;
- if(t.id == m.move.id) {
- id_found = true;
- // t.x = x;
- // t.y = m.move.y;
- t.parent_id = m.move.parent_id;
- moved_task = t;
- tasklist.move(t);
- break;
- }
- }
-
- if(!id_found) {
- printf("\t!!!Could not locate task with id %d\n", m.move.id);
- }
-
- // buf_len = sprintf(buf, "move %d %d %d;", m.move.id, x, m.move.y);
- buf_str = msg_tostring(create_msg(cmd::move, moved_task));
- printf("Moving task: %s\n", buf_str.c_str());
- break;
- }
- case cmd::update: {
- printf("Updating %d\n", m.update.id);
-
- bool id_found = false;
- TaskList::iterator it;
-
- task_t updated_task;
- for(it = tasklist.begin(); it != tasklist.end(); it++) {
- task_t &t = *it;
- if(t.id == m.update.id) {
- id_found = true;
- t.title = m.update.title;
- t.desc = m.update.desc;
- updated_task = t;
- break;
- }
- }
-
- if(!id_found) {
- printf("\t!!!Could not locate task with id %d\n", m.update.id);
- }
-
- buf_str = msg_tostring(create_msg(cmd::update, updated_task));
- printf("Updating task: %s\n", buf_str.c_str());
-
- break;
- }
- default:
- printf("Wrong command :(\n");
- break;
- }
-
- if(!save_tasklist_to_file(tasklist, "/tmp/muniad.db")) {
- printf("Could not flush db to file\n");
- }
- else {
- // XmlParser xml("/tmp/muniad.db");
- // xml.parse();
- // tasklist = xml.tasklist;
- }
-
- // print tasklist
- TaskList::iterator it_taskend;
- printf("<--[");
- for(it_taskend = tasklist.begin();
- it_taskend != tasklist.end();
- it_taskend++) {
- printf(" %d", it_taskend->id);
- }
- printf("]\n");
-
- // msg.payload = malloc(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING);
- // msg.len = len;
- // memcpy((char *)msg.payload + LWS_SEND_BUFFER_PRE_PADDING, in, len);
- // if(ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0;
- // else ringbuffer_head++;
- /*
- msg.payload = malloc(LWS_SEND_BUFFER_PRE_PADDING + buf_len
- + LWS_SEND_BUFFER_POST_PADDING);
- msg.len = buf_len;
- memcpy((char *)msg.payload + LWS_SEND_BUFFER_PRE_PADDING,
- buf, buf_len);
- if(ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0;
- else ringbuffer_head++;
- */
+ MessageList mlst = parse_msg(data);
+ MessageList omsgs = handle_msg(mlst);
+ MessageList::iterator omi = omsgs.begin();
+ while(omi != omsgs.end()) {
std::string msg;
msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
- // msg.append(buf, buf_len);
- msg.append(buf_str);
+ msg.append(msg_tostring(*omi));
msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
-
- std::map<struct libwebsocket *, std::queue<std::string> >::iterator it = msgqueue.begin();
- while(it != msgqueue.end()) {
- printf("!!\n");
- it->second.push(msg);
- it++;
+
+ std::set<clientid_t> clients =
+ connection_handler.observerlist(omi->nodes);
+
+ std::set<clientid_t>::iterator ci = clients.begin();
+ while(ci != clients.end()) {
+ msgqueue[*ci].push(msg);
+ ci++;
}
- /*
- if(((ringbuffer_head - pss->ringbuffer_tail) % MAX_MESSAGE_QUEUE) > (MAX_MESSAGE_QUEUE - 10))
- libwebsocket_rx_flow_control(wsi, 0);
- */
- libwebsocket_callback_on_writable_all_protocol(libwebsockets_get_protocol(wsi));
- //libwebsocket_rx_flow_control(wsi, 0);
+
+ omi++;
}
- }
- break;
- /*
- * this just demonstrates how to use the protocol filter. If you won't
- * study and reject connections based on header content, you don't need
- * to handle this callback
- */
- case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
- printf("LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION\n");
- dump_handshake_info((struct lws_tokens *)(long)user);
- /* you could return non-zero here and kill the connection */
+ libwebsocket_callback_on_writable_all_protocol(
+ libwebsockets_get_protocol(wsi));
+ }
break;
default:
diff --git a/src/task_proto.h b/src/task_proto.h
index fe57752..160dee9 100644
--- a/src/task_proto.h
+++ b/src/task_proto.h
@@ -32,7 +32,7 @@
#include <libwebsockets.h>
-int callback_lws_task(struct libwebsocket_context * context,
+int callback_lws_task(struct libwebsocket_context *context,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason,
void *user, void *in, size_t len);
diff --git a/src/xmlparser.cc b/src/xmlparser.cc
index 7bc7e0a..97a8bdb 100644
--- a/src/xmlparser.cc
+++ b/src/xmlparser.cc
@@ -75,7 +75,7 @@ void XmlParser::startTag(std::string name, attributes_t &attr) {
task = new task_t();
task->id = atoi(xml_decode(attr["id"]).c_str());
- task->parent_id = atoi(xml_decode(attr["parent_id"]).c_str());
+ // task->parent_id = atoi(xml_decode(attr["parent_id"]).c_str());
}
}
@@ -83,7 +83,7 @@ void XmlParser::startTag(std::string name, attributes_t &attr) {
void XmlParser::endTag(std::string name) {
// printf("end: %s\n", name.c_str());
if (name == "task") {
- if(task) tasklist.insert(*task);
+ // if(task) tasklist.push_back(*task);
}
else if (name == "title") {
if(task) task->title = characterbuf;