From 5ccdf6dd987be086a8712a4960f770122b1b499b Mon Sep 17 00:00:00 2001
From: deva <deva>
Date: Fri, 15 Aug 2008 09:37:36 +0000
Subject: Changes for read-loop-server.

---
 server/src/daemon.cc            |   2 +-
 server/src/pracrod.cc           |  25 ++++++
 server/src/saxparser.h          |   1 -
 server/src/server.cc            | 103 ++++++++++++++++-------
 server/src/tcpsocket.cc         | 178 +++++++++++++++++++++++++++-------------
 server/src/tcpsocket.h          |   7 +-
 server/src/transactionparser.cc |  42 +++++++++-
 server/src/transactionparser.h  |   6 +-
 8 files changed, 266 insertions(+), 98 deletions(-)

(limited to 'server/src')

diff --git a/server/src/daemon.cc b/server/src/daemon.cc
index 655a234..6aed66e 100644
--- a/server/src/daemon.cc
+++ b/server/src/daemon.cc
@@ -153,7 +153,7 @@ int Daemon::run(const char *user, const char* group, bool detach)
   setsid();
 
   signal (SIGTERM, SIG_IGN);
-  signal (SIGHUP, SIG_IGN);
+  //  signal (SIGHUP, SIG_IGN);
   if(detach) signal (SIGINT, SIG_IGN); // Don't disable Ctrl+c when running in foreground.
 
   return daemon_main();
diff --git a/server/src/pracrod.cc b/server/src/pracrod.cc
index 6ee1817..6ca21db 100644
--- a/server/src/pracrod.cc
+++ b/server/src/pracrod.cc
@@ -33,6 +33,9 @@
 #include <sys/types.h>
 #include <unistd.h>
 
+// For waitpid
+#include <sys/wait.h>
+
 // For signal
 #include <signal.h>
 
@@ -85,6 +88,24 @@ static const char usage_str[] =
 
 ConfigurationParser *configparser = NULL;
 
+bool pracro_is_running = true;
+
+void ctrl_c(int)
+{
+  //  printf("Ctrl+c\n");
+  pracro_is_running = false;
+}
+
+void childwait(int)
+{
+  //  printf("childwait\n");
+
+  pid_t pid;
+  while((pid = waitpid(-1, NULL, WNOHANG)) > 0) {
+    //    printf("\tOne down!\n");
+  }
+}
+
 void reload(int)
 {
   int port;
@@ -183,6 +204,10 @@ int main(int argc, char *argv[])
     group = strdup(Conf::server_group.c_str());
   }
 
+  signal(SIGHUP, reload);
+  signal(SIGCLD, childwait);
+  if(foreground) signal (SIGINT, ctrl_c);
+
   PracroDaemon daemon;
   daemon.run(user, group, !foreground);
 
diff --git a/server/src/saxparser.h b/server/src/saxparser.h
index 2ee4f43..67a86b7 100644
--- a/server/src/saxparser.h
+++ b/server/src/saxparser.h
@@ -47,7 +47,6 @@ public:
 protected:
   virtual int readData(char *data, size_t size) { return 0; }
 
-private:
   XML_Parser p;
 };
 
diff --git a/server/src/server.cc b/server/src/server.cc
index a419181..d2921b8 100644
--- a/server/src/server.cc
+++ b/server/src/server.cc
@@ -52,6 +52,8 @@
 static std::string error_box(std::string message)
 {
   std::string errorbox =
+    "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+    "<pracro version=\"1.0\">\n"
     "  <course name=\"error\">\n"
     "    <macro name=\"error\">\n"
     "      <window caption=\"ERROR!\" height=\"300\" layout=\"vbox\" name=\"error\" width=\"480\">\n"
@@ -59,7 +61,8 @@ static std::string error_box(std::string message)
     "        <button action=\"cancel\" caption=\"Luk\" name=\"cancel\"/>\n"
     "      </window>\n"
     "    </macro>\n"
-    "  </course>\n";
+    "  </course>\n"
+    "</pracro>\n";
   return errorbox;
 }
 
@@ -69,16 +72,13 @@ public:
   : Exception("Macro " + r.macro + " not found in course " + r.course) {}
 };
 
-static void connection(TCPSocket &socket)
+static std::string handleTransaction(Transaction &transaction)
 {
-  socket.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
-  socket.write("<pracro version=\"1.0\">\n");
+  std::string answer;
+  answer += "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n";
+  answer += "<pracro version=\"1.0\">\n";
 
   try {
-    Transaction transaction;
-    TransactionParser parser(socket, transaction);
-    parser.parse();
-
     Database db;
 
     //
@@ -112,8 +112,6 @@ static void connection(TCPSocket &socket)
     while(i != transaction.requests.end()) {
       Request &request = *i;
 
-      std::string answer;
-
       printf("Handling request - macro: %s, course: %s\n",
              request.macro.c_str(), request.course.c_str());
 
@@ -201,26 +199,72 @@ static void connection(TCPSocket &socket)
 
       answer += "  </course>\n";
 
-      socket.write(answer);
       i++;
     }
+
+    answer += "</pracro>\n";
+
   } catch( PGSTD::runtime_error &e ) {
-    socket.write(error_box(xml_encode(std::string("PostgreSQL server error:\n") + e.what())));
+    answer = error_box(xml_encode(std::string("PostgreSQL server error:\n") + e.what()));
   } catch( std::exception &e ) {
-    socket.write(error_box(xml_encode(e.what())));
+    answer = error_box(xml_encode(e.what()));
   }
 
-  socket.write("</pracro>\n");
+  return answer;
 }
 
 
+static void handleConnection(TCPSocket *socket)
+{
+  size_t bufsize = 256;
+  int size;
+  char buf[256];
+  memset(buf, 0, bufsize);
+  
+  Transaction *transaction = NULL;
+  TransactionParser *parser = NULL;
+  
+  while((size = socket->read(buf, bufsize)) > 0) {
+    
+    if(transaction == NULL) {
+      transaction = new Transaction();
+      parser = new TransactionParser(transaction);
+    }
+
+    printf("Got %d bytes in read loop\n", size);
+    if(parser->parse(buf)) {
+      socket->write(handleTransaction(*transaction));
+
+      delete transaction;
+      transaction = NULL;
+
+      delete parser;
+      parser = NULL;
+    }
+    memset(buf, 0, bufsize);
+  }
+
+  if(transaction) {
+    delete transaction;
+    transaction = NULL;
+
+    delete parser;
+    parser = NULL;
+  }
+
+  printf("Out of read loop!\n");
+}
+
+//#define NON_FORKING
+#include <sys/socket.h>
+extern bool pracro_is_running;
 void server()
 {
   port_t port = Conf::server_port;
   TCPSocket *socket = NULL;
   
   try {
-    socket = new TCPSocket();
+    socket = new TCPSocket("Listen socket");
     socket->listen(port);
   } catch (Exception &e) {
     fprintf(stderr, "Error during parsing:\n%s\n",
@@ -230,7 +274,7 @@ void server()
     return;
   }
 
-  while(socket->connected()) {
+  while(pracro_is_running && socket->connected()) {
 
     { // Reload if new port is assigned.
       int old_port = port;
@@ -239,41 +283,44 @@ void server()
       if(port != old_port) {
         // Start listening on the new port
         delete socket;
-        socket = new TCPSocket();
+        socket = new TCPSocket("Listen socket (reloaded)");
         socket->listen(port);
       }
     }
 
-    TCPSocket child = socket->accept();
-    if(child.connected()) {
-      //socket->disconnect();
-      connection(child);
-      //delete socket;
+    TCPSocket *child = socket->accept();
+    if(child) {
 
-      /*
+#ifndef NON_FORKING
       switch(fork()) {
       case -1: // error
         fprintf(stderr, "Could not fork: %s\n", strerror(errno));
         break;
         
       case 0: // child
-        socket->disconnect();
-        connection(child);
         delete socket;
+#endif/*NON_FORKING*/
+        handleConnection(child);
+        delete child;
+#ifndef NON_FORKING
         return;
         
       default: // parent
-        child.disconnect();
+        delete child;
         break;
       }
-      */
+#endif/*NON_FORKING*/
+
     }
   }
 
+  //socket->shutdown();
   delete socket;
-  fprintf(stderr, "Oups... dropped out of the main loop\n");
+
+  printf("Server gracefully shut down.\n");
 }
 
+
 #ifdef TEST_SERVER
 
 char request[] = 
diff --git a/server/src/tcpsocket.cc b/server/src/tcpsocket.cc
index 4bb7a4b..b6049c7 100644
--- a/server/src/tcpsocket.cc
+++ b/server/src/tcpsocket.cc
@@ -27,6 +27,9 @@
 #include "tcpsocket.h"
 
 #include "debug.h"
+#include <config.h>
+
+//#define WITH_DEBUG
 
 // for gethostbyname
 #include <netdb.h>
@@ -74,17 +77,28 @@
 #include <unistd.h>
 #include <fcntl.h>
 
-TCPSocket::TCPSocket()
+TCPSocket::TCPSocket(std::string name, int sock)
   throw(TCPSocketException)
 {
-  if((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
-    throw TCPSocketException(strerror(errno));
+  this->name = name;
+  if(sock == -1) {
+    if((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
+      throw TCPSocketException(strerror(errno));
+    }
   }
   isconnected = false;
+  this->sock = sock;
+
+#ifdef WITH_DEBUG
+   printf("TCPSocket %s: %p %d (%d)\n", name.c_str(), this, sock, getpid());
+#endif/*WITH_DEBUG*/
 }
 
 TCPSocket::~TCPSocket()
 {
+#ifdef WITH_DEBUG
+  printf("~TCPSocket %s: %p %d (%d)\n", name.c_str(), this, sock, getpid());
+#endif/*WITH_DEBUG*/
   disconnect();
 }
 
@@ -94,13 +108,8 @@ void TCPSocket::listen(unsigned short int port)
   throw(TCPListenException)
 {
 
-  if(sock == -1) {
-    throw TCPListenException("Socket not initialized.");
-  }
-
-  if(isconnected) {
-    throw TCPListenException("Socket already connected.");
-  }
+  if(sock == -1) throw TCPListenException("Socket not initialized.");
+  if(isconnected) throw TCPListenException("Socket already connected.");
 
   struct sockaddr_in socketaddr;
   memset((char *) &socketaddr, sizeof(socketaddr), 0);
@@ -119,35 +128,52 @@ void TCPSocket::listen(unsigned short int port)
   isconnected = true;
 }
 
-
+/**
+ **
+ ** Accept connections and block until one gets in.
+ ** Return the new connection on incoming.
+ ** It throws exceptions if an error occurres.
+ ** On interrupts, it returns NULL
+ **
+ **/
 static int _accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
 {return accept(sockfd, addr, addrlen);}
-TCPSocket TCPSocket::accept()
+TCPSocket *TCPSocket::accept()
   throw(TCPAcceptException)
 {
-  TCPSocket child;
-
-  if(sock == -1) {
-    throw TCPAcceptException("Socket not initialized.");
-  }
-
-  if(!isconnected) {
-    throw TCPAcceptException("Socket not connected.");
-  }
+  if(sock == -1) throw TCPAcceptException("Socket not initialized.");
+  if(!isconnected) throw TCPAcceptException("Socket not connected.");
 
   // accept new connection and get its connection descriptor
   struct sockaddr_in ssocketaddr;
   int csalen = sizeof(ssocketaddr);
 
-  child.disconnect(); // We need to close the existing socket
-  child.sock = _accept(sock, (struct sockaddr*)&ssocketaddr, (socklen_t*)&csalen);
-
-  if (child.sock == -1) {
-    throw TCPAcceptException(std::string("accept failed - ") + strerror(errno));
+  // Select
+  fd_set fset;
+  int ret;
+  FD_ZERO(&fset);
+  FD_SET(sock, &fset);
+  if( (ret = select (sock+1, &fset, NULL, NULL, NULL)) < 0) { 
+    if(errno == EINTR) {
+      printf("Accept got interrupt!\n");
+      return NULL; // a signal caused select to return. That is OK with me
+    } else {
+      throw TCPAcceptException("Select on socket failed.");
+    }
+  }
+  if(FD_ISSET(sock, &fset)) {
+    int csock = _accept(sock, (struct sockaddr*)&ssocketaddr, (socklen_t*)&csalen);
+    TCPSocket *child = new TCPSocket(name + "-child", csock);
+    
+    if (child->sock == -1) {
+      throw TCPAcceptException(std::string("accept failed - ") + strerror(errno));
+    }
+    child->isconnected = true;
+    return child;
+  } else {
+    printf("Accept returned with no socket - This should not happen!\n");
+    return NULL;
   }
-
-  child.isconnected = true;
-  return child;
 }
 
 
@@ -155,11 +181,8 @@ static int _connect(int sockfd, const struct sockaddr *serv_addr, socklen_t addr
 {return connect(sockfd, serv_addr, addrlen);}
 void TCPSocket::connect(std::string addr, unsigned short int port)
   throw(TCPConnectException)
-  
 {
-  if(isconnected) {
-    throw TCPConnectException("Socket already connected.", "", "");
-  }
+  if(isconnected) throw TCPConnectException("Socket already connected.", "", "");
 
 #ifndef BYPASS_STATICALLOCATIONS
   // Do DNS lookup
@@ -199,7 +222,13 @@ void TCPSocket::connect(std::string addr, unsigned short int port)
 void TCPSocket::disconnect()
 {
   if(sock != -1) {
-    close(sock);
+#ifdef WITH_DEBUG
+    printf("Closing TCPSocket %s: %p %d (%d)\n", name.c_str(), this, sock, getpid());
+#endif/*WITH_DEBUG*/
+    int ret = close(sock);
+    if(ret == -1) {
+      perror(name.c_str());
+    }
     sock = -1;
   }
   isconnected = false;
@@ -210,41 +239,69 @@ bool TCPSocket::connected()
   return sock != -1 && isconnected;
 }
 
-int TCPSocket::read(char *buf, int size)
+
+
+/**
+ **
+ ** Read read a number of bytes from the network.
+ ** It returns the number of bytes read.
+ ** It throws exceptions if an error occurres.
+ ** On interrupts, it returns -1
+ **
+ **/
+ssize_t _read(int fd, void *buf, size_t count) { return read(fd, buf, count); }
+int TCPSocket::read(char *buf, int size, long timeout)
   throw(TCPReadException)
 {
   int res = 0;
 
-  if(sock == -1) {
-    throw TCPReadException("Socket not initialized.");
-  }
-
-  if(!isconnected) {
-    throw TCPReadException("Socket is not connected.");
-  }
-
-  /*
-  if( (res = recv(sock, buf, size, MSG_WAITALL)) == -1 ) {
-    throw TCPReadException(strerror(errno));
-  }
-  */
+  if(sock == -1) throw TCPReadException("Socket not initialized.");
+  if(!isconnected) throw TCPReadException("Socket is not connected.");
+
+  // Select
+  struct timeval tv;
+  tv.tv_sec = 0;
+  tv.tv_usec = timeout;
+
+  struct timeval *ptv = NULL;
+  if(timeout >= 0) ptv = &tv;
+
+  fd_set fset;
+  int ret;
+  FD_ZERO(&fset);
+  FD_SET(sock, &fset);
+  ret = select (sock+1, &fset, NULL, NULL, ptv);
+  switch(ret) {
+  case -1:
+    if(errno == EINTR) {
+      printf("EINTR - got interrupt\n");
+      return -1; // a signal caused select to return. That is OK with me
+    } else {
+      throw TCPReadException("Select on socket (read) failed.");
+    }
+    break;
 
-  // Wait until something is ready to be read ( peek'a'loop ).
-  errno = EAGAIN;
-  while( recv(sock, buf, 1, MSG_PEEK) == -1 && errno == EAGAIN) {
-    usleep(10);
-  }
+  case 0:
+    // timeout
+    printf("Timeout\n");
+    break;
 
-  // Now read it
-  if( (res = recv(sock, buf, size, MSG_DONTWAIT)) == -1 ) {
-    throw TCPReadException(strerror(errno));
+  default:
+    if(FD_ISSET(sock, &fset)) {
+      //      res = recv(sock, buf, size, MSG_DONTWAIT);
+      if( (res = _read(sock, buf, size)) == -1 ) {
+        throw TCPReadException(strerror(errno));
+      }
+    } else {
+      printf("FD_ISSET failed (timeout?)\n");
+      return 0;
+    }
   }
 
-  //  fwrite(buf, size, 1, stdout); fflush(stdout);
-
   return res;
 }
 
+ssize_t _write(int fd, const void *buf, size_t count) { return write(fd, buf, count); } 
 int TCPSocket::write(char *data, int size)
   throw(TCPWriteException)
 {
@@ -257,10 +314,13 @@ int TCPSocket::write(char *data, int size)
   }
 
   int res;
-  if( (res = send(sock, data, size, MSG_WAITALL)) == -1 ) {
+  //  if( (res = send(sock, data, size, MSG_WAITALL)) == -1 ) {
+  if( (res = _write(sock, data, size)) == -1 ) {
     throw TCPWriteException(strerror(errno));
   }
 
+  printf("Outputted %d byes\n", res);
+
   return res;
 }
 
diff --git a/server/src/tcpsocket.h b/server/src/tcpsocket.h
index 393d40b..4771d10 100644
--- a/server/src/tcpsocket.h
+++ b/server/src/tcpsocket.h
@@ -103,7 +103,7 @@ public:
   /**
    * Constructor. Creates a new tcp socket.
    */
-  TCPSocket() throw(TCPSocketException);
+  TCPSocket(std::string name = "", int sock = -1) throw(TCPSocketException);
 
   /**
    * Destructor. Closes the tcp socket.
@@ -123,7 +123,7 @@ public:
    * Multiple accepts can be made on the same listening socket.
    * @return A connected TCPSocket ready to communicate.
    */
-  TCPSocket accept() throw(TCPAcceptException);
+  TCPSocket *accept() throw(TCPAcceptException);
     
   /**
    * Connects to a host for data transmission.
@@ -149,7 +149,7 @@ public:
    * @param size The maximum number of bytes to read in (the size of the buffer).
    * @return The actual number of bytes read.
    */
-  int read(char *buf, int size) throw(TCPReadException);
+  int read(char *buf, int size, long timeout = -1) throw(TCPReadException);
     
   /**
    * Writes bytes from a buffer to the socket.
@@ -175,6 +175,7 @@ public:
 private:
   bool isconnected;
   int sock;
+  std::string name;
 };
 
 
diff --git a/server/src/transactionparser.cc b/server/src/transactionparser.cc
index 0e64944..3d3024c 100644
--- a/server/src/transactionparser.cc
+++ b/server/src/transactionparser.cc
@@ -33,13 +33,16 @@
 #include <string>
 #include <map>
 
-TransactionParser::TransactionParser(TCPSocket &socket, Transaction &transaction)
+TransactionParser::TransactionParser(Transaction *transaction)
 {
-  this->transaction = &transaction;
-  this->socket = &socket;
+  this->transaction = transaction;
   done = false;
 }
 
+TransactionParser::~TransactionParser()
+{
+}
+
 void TransactionParser::startTag(std::string name, std::map< std::string, std::string> attributes)
 {
   if(name == "pracro") {
@@ -74,8 +77,39 @@ void TransactionParser::endTag(std::string name)
 
 int TransactionParser::readData(char *data, size_t size)
 {
+  printf("readData is not uasble with transaction parser!.\nUse parse(std::string) instead.\n");
+  return 0;
+/*
   if(done) return 0;
-  return socket->read(data, size);
+
+  int ret;
+
+  //  while((ret = socket->read(data, size, 1000)) == 0) { }
+  
+  if(ret == -1) {
+    printf("Transaction parser was interrupted.\n");
+    return 0;
+  }
+  
+  return ret;
+*/
+}
+
+bool TransactionParser::parse(std::string data)
+{
+  if(! XML_Parse(p, (char*)data.c_str(), data.size(), false) ) {
+    parseError((char*)data.c_str(), data.size(), XML_ErrorString(XML_GetErrorCode(p)), (int)XML_GetCurrentLineNumber(p));
+    return false;
+  }
+
+  if(done) {
+    if(! XML_Parse(p, "", 0, true) ) {
+      parseError("", 0, XML_ErrorString(XML_GetErrorCode(p)), (int)XML_GetCurrentLineNumber(p));
+      return false;
+    }
+  }
+
+  return done;
 }
 
 void TransactionParser::parseError(char *buf, size_t len, std::string error, int lineno)
diff --git a/server/src/transactionparser.h b/server/src/transactionparser.h
index b64d68a..3f477c9 100644
--- a/server/src/transactionparser.h
+++ b/server/src/transactionparser.h
@@ -33,19 +33,21 @@
 
 class TransactionParser : public SAXParser {
 public:
-  TransactionParser(TCPSocket &socket, Transaction &transaction);
+  TransactionParser(Transaction *transaction);
+  ~TransactionParser();
 
   void startTag(std::string name, std::map< std::string, std::string> attributes);
   void endTag(std::string name);
 
   void parseError(char *buf, size_t len, std::string error, int lineno);
 
+  bool parse(std::string data);
+
 protected:
   int readData(char *data, size_t size);
 
 private:
   Transaction *transaction;
-  TCPSocket *socket;
   bool done;
 };
 
-- 
cgit v1.2.3