summaryrefslogtreecommitdiff
path: root/src/task_proto.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/task_proto.cc')
-rw-r--r--src/task_proto.cc282
1 files changed, 30 insertions, 252 deletions
diff --git a/src/task_proto.cc b/src/task_proto.cc
index 6a4e762..cf647d5 100644
--- a/src/task_proto.cc
+++ b/src/task_proto.cc
@@ -35,8 +35,10 @@
#include "task.h"
#include "messageparser.h"
-#include "xmlparser.h"
+#include "messagehandler.h"
+#include "connectionhandler.h"
+#if 0
static void dump_handshake_info(struct lws_tokens *lwst)
{
int n;
@@ -74,6 +76,7 @@ static void dump_handshake_info(struct lws_tokens *lwst)
fprintf(stderr, "\n");
}
}
+#endif
static std::map<struct libwebsocket *, std::queue<std::string> > msgqueue;
@@ -82,64 +85,18 @@ int callback_lws_task(struct libwebsocket_context * context,
enum libwebsocket_callback_reasons reason,
void *user, void *in, size_t len)
{
- int n;
- struct per_session_data__lws_task *pss =
- (struct per_session_data__lws_task *)user;
-
switch (reason) {
- printf("============= Reason %d\n", reason);
- case LWS_CALLBACK_ESTABLISHED:
- {
- fprintf(stderr, "callback_lws_task: LWS_CALLBACK_ESTABLISHED\n");
- // pss->ringbuffer_tail = ringbuffer_head;
- pss->wsi = wsi;
- /*
-
- // send all current tasks
- // char buf[512];
- std::string init_str;
- TaskList::iterator it;
- for(it = tasklist.begin(); it != tasklist.end(); it++) {
- task_t &t = *it;
-
- printf("add %d %s %s %d;",
- t.id, t.title.c_str(), t.desc.c_str(), t.parent_id);
-
- // init_str.append(task_cmd(t));
- init_str += msg_tostring(create_msg(cmd::add, t));
- }
-
- std::string msg;
- msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
- msg.append(init_str);
- msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
- msgqueue[wsi].push(msg);
-
- libwebsocket_callback_on_writable(context, wsi);
- */
- }
- break;
-
- case LWS_CALLBACK_CLOSED:
- {
- printf("Closing connection\n");
- msgqueue.erase(wsi);
- }
- break;
-
case LWS_CALLBACK_SERVER_WRITEABLE:
{
- printf("LWS_CALLBACK_SERVER_WRITEABLE\n");
-
- // if(pss->ringbuffer_tail != ringbuffer_head) {
if(msgqueue[wsi].size() > 0) {
std::string msg = msgqueue[wsi].front();
msgqueue[wsi].pop();
- n = libwebsocket_write(wsi, (unsigned char *)
- msg.c_str() +
- LWS_SEND_BUFFER_PRE_PADDING,
- msg.length() - LWS_SEND_BUFFER_POST_PADDING - LWS_SEND_BUFFER_PRE_PADDING,
- LWS_WRITE_TEXT);
+ int n = libwebsocket_write(wsi, (unsigned char *)
+ msg.c_str() +
+ LWS_SEND_BUFFER_PRE_PADDING,
+ msg.length() - LWS_SEND_BUFFER_POST_PADDING -
+ LWS_SEND_BUFFER_PRE_PADDING,
+ LWS_WRITE_TEXT);
if(n < 0) {
fprintf(stderr, "ERROR writing to socket");
exit(1);
@@ -153,11 +110,13 @@ int callback_lws_task(struct libwebsocket_context * context,
}
break;
+ /*
case LWS_CALLBACK_BROADCAST:
printf("LWS_CALLBACK_BROADCAST\n");
n = libwebsocket_write(wsi, (unsigned char*)in, len, LWS_WRITE_TEXT);
if (n < 0) fprintf(stderr, "task write failed\n");
break;
+ */
case LWS_CALLBACK_RECEIVE:
{
@@ -167,211 +126,30 @@ int callback_lws_task(struct libwebsocket_context * context,
std::string data;
data.append((char*)in, len);
- /*
- struct a_message &msg = ringbuffer[ringbuffer_head];
- if(msg.payload) {
- free(msg.payload);
- msg.payload = NULL;
- }
-
- */
- // char buf[1024];
- // size_t buf_len = 0;
-
- // print tasklist
- TaskList::iterator it_taskbegin;
- printf("-->[");
- for(it_taskbegin = tasklist.begin();
- it_taskbegin != tasklist.end();
- it_taskbegin++) {
- printf(" %d", it_taskbegin->id);
- }
- printf("]\n");
-
- std::string buf_str;
-
- MsgVector msglist = parse_msg(data);
- MsgVector::iterator it_msg;
- for(it_msg = msglist.begin(); it_msg != msglist.end(); it_msg++) {
- msg_t m = *it_msg;
- switch(m.cmd) {
- case cmd::add: {
- printf("Handling add cmd\n");
-
- // task_t t = create_task(m.add.title, m.add.desc,
- // m.add.x, m.add.y);
- task_t t = create_task(m.add.title, m.add.desc,
- m.add.parent_id);
- tasklist.insert(t);
- // buf_len = sprintf(buf, "add %d %s %s %d %d;",
- // t.id, t.title.c_str(), t.desc.c_str(),
- // t.x, t.y);
-
- // msg_t tm = create_msg(cmd::add, t);
- // std::string str = msg_tostring(tm);
- buf_str = msg_tostring(create_msg(cmd::add, t));
-
- printf("Adding task: %s\n", buf_str.c_str());
- break;
- }
- case cmd::del: {
- printf("Delete\n");
- printf("Deleting task with id %d\n", m.del.id);
-
- // todo: delete all children recursively
- task_t deleted_task;
- // tasklist.erase(m.del.id, &deleted_task);
- bool id_found = false;
- TaskList::iterator it;
- for(it = tasklist.begin(); it != tasklist.end(); it++) {
- task_t &t = *it;
- if(t.id == m.del.id) {
- id_found = true;
- deleted_task = t;
- tasklist.erase(it);
- break;
- }
- }
-
- if(!id_found) {
- printf("\t!!!Could not locate task with id %d\n", m.del.id);
- }
- else {
- // buf_len = sprintf(buf, "del %d;", m.del.id);
- buf_str = msg_tostring(create_msg(cmd::del, deleted_task));
- printf("Deleting task: %s\n", buf_str.c_str());
- }
- break;
- }
- case cmd::move: {
- printf("Move\n");
-
- printf("Moving task with id %d to %d\n", m.move.id, m.move.parent_id);
-
- bool id_found = false;
- TaskList::iterator it;
-
- // int x = m.move.x / 300 * 300;
-
- task_t moved_task;
- for(it = tasklist.begin(); it != tasklist.end(); it++) {
- task_t &t = *it;
- if(t.id == m.move.id) {
- id_found = true;
- // t.x = x;
- // t.y = m.move.y;
- t.parent_id = m.move.parent_id;
- moved_task = t;
- tasklist.move(t);
- break;
- }
- }
-
- if(!id_found) {
- printf("\t!!!Could not locate task with id %d\n", m.move.id);
- }
-
- // buf_len = sprintf(buf, "move %d %d %d;", m.move.id, x, m.move.y);
- buf_str = msg_tostring(create_msg(cmd::move, moved_task));
- printf("Moving task: %s\n", buf_str.c_str());
- break;
- }
- case cmd::update: {
- printf("Updating %d\n", m.update.id);
-
- bool id_found = false;
- TaskList::iterator it;
-
- task_t updated_task;
- for(it = tasklist.begin(); it != tasklist.end(); it++) {
- task_t &t = *it;
- if(t.id == m.update.id) {
- id_found = true;
- t.title = m.update.title;
- t.desc = m.update.desc;
- updated_task = t;
- break;
- }
- }
-
- if(!id_found) {
- printf("\t!!!Could not locate task with id %d\n", m.update.id);
- }
-
- buf_str = msg_tostring(create_msg(cmd::update, updated_task));
- printf("Updating task: %s\n", buf_str.c_str());
-
- break;
- }
- default:
- printf("Wrong command :(\n");
- break;
- }
-
- if(!save_tasklist_to_file(tasklist, "/tmp/muniad.db")) {
- printf("Could not flush db to file\n");
- }
- else {
- // XmlParser xml("/tmp/muniad.db");
- // xml.parse();
- // tasklist = xml.tasklist;
- }
-
- // print tasklist
- TaskList::iterator it_taskend;
- printf("<--[");
- for(it_taskend = tasklist.begin();
- it_taskend != tasklist.end();
- it_taskend++) {
- printf(" %d", it_taskend->id);
- }
- printf("]\n");
-
- // msg.payload = malloc(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING);
- // msg.len = len;
- // memcpy((char *)msg.payload + LWS_SEND_BUFFER_PRE_PADDING, in, len);
- // if(ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0;
- // else ringbuffer_head++;
- /*
- msg.payload = malloc(LWS_SEND_BUFFER_PRE_PADDING + buf_len
- + LWS_SEND_BUFFER_POST_PADDING);
- msg.len = buf_len;
- memcpy((char *)msg.payload + LWS_SEND_BUFFER_PRE_PADDING,
- buf, buf_len);
- if(ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0;
- else ringbuffer_head++;
- */
+ MessageList mlst = parse_msg(data);
+ MessageList omsgs = handle_msg(mlst);
+ MessageList::iterator omi = omsgs.begin();
+ while(omi != omsgs.end()) {
std::string msg;
msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' ');
- // msg.append(buf, buf_len);
- msg.append(buf_str);
+ msg.append(msg_tostring(*omi));
msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' ');
-
- std::map<struct libwebsocket *, std::queue<std::string> >::iterator it = msgqueue.begin();
- while(it != msgqueue.end()) {
- printf("!!\n");
- it->second.push(msg);
- it++;
+
+ std::set<clientid_t> clients =
+ connection_handler.observerlist(omi->nodes);
+
+ std::set<clientid_t>::iterator ci = clients.begin();
+ while(ci != clients.end()) {
+ msgqueue[*ci].push(msg);
+ ci++;
}
- /*
- if(((ringbuffer_head - pss->ringbuffer_tail) % MAX_MESSAGE_QUEUE) > (MAX_MESSAGE_QUEUE - 10))
- libwebsocket_rx_flow_control(wsi, 0);
- */
- libwebsocket_callback_on_writable_all_protocol(libwebsockets_get_protocol(wsi));
- //libwebsocket_rx_flow_control(wsi, 0);
+
+ omi++;
}
- }
- break;
- /*
- * this just demonstrates how to use the protocol filter. If you won't
- * study and reject connections based on header content, you don't need
- * to handle this callback
- */
- case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
- printf("LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION\n");
- dump_handshake_info((struct lws_tokens *)(long)user);
- /* you could return non-zero here and kill the connection */
+ libwebsocket_callback_on_writable_all_protocol(
+ libwebsockets_get_protocol(wsi));
+ }
break;
default: