/* LICENSE:
  =========================================================================
    CMPack'04 Source Code Release for OPEN-R SDK 1.1.5-r2 for ERS7
    Copyright (C) 2004 Multirobot Lab [Project Head: Manuela Veloso]
    School of Computer Science, Carnegie Mellon University
    All rights reserved.
  ========================================================================= */

#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <string.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include "WaveServer.h"
#include "WLDecoder.h"
#include "../../agent/headers/CircBufPacket.h"
#include "../../agent/headers/WaveLAN.h"

void WaveServer::init() {
  for(int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++) {
    socket_accept_fd[socket_idx] = -1;
    socket_read_fd[socket_idx] = -1;
  }
}

void *SocketProcessor(WaveServerConfig *config) {
  int socket_read_fd=config->server.socket_read_fd[InSocketIdx];

  printf("starting socket processor on socket fd %d\n",socket_read_fd);

  WLDemuxer demuxer;
  demuxer.setCallback(config,(WLDecoder::PacketCallback)config->packetCallback);
  if(fcntl(socket_read_fd,F_SETFL,O_NONBLOCK)==-1) {
    perror("problem setting socket to nonblocking");
    pthread_exit(NULL);
  }

  struct pollfd poll_fds[1];
  poll_fds[0].fd = socket_read_fd;
  poll_fds[0].events  = POLLIN | POLLPRI;
  poll_fds[0].revents = 0;

  bool run;
  run = *config->Run;
  while(run) {
  
    poll(poll_fds,sizeof(poll_fds)/sizeof(poll_fds[0]),0);
    if(poll_fds[0].revents & (POLLIN | POLLPRI))
      run = run && demuxer.processInput(socket_read_fd);

    usleep(10000);

    sem_wait(config->RunSemaphore);
    run = run && *config->Run;
    sem_post(config->RunSemaphore);
  }

  if(close(socket_read_fd)==-1) {
    perror("problem closing read socket");
  }

  printf("stopping socket processor on socket fd %d\n",socket_read_fd);

  pthread_exit(NULL);

  return NULL;
}

bool SendMessage(WaveServer *server,NetMsgHeader *header) {
  int socket_write_fd;
  static char buf[4096];

  socket_write_fd = server->socket_read_fd[OutSocketIdx];

  int data_length = header->totalLength+1;

  *buf='\x80';
  memcpy(buf+1,((uchar *)header),data_length-1);

  int write_count;
  int data_written = 0;

  // printf("sending message, %d bytes\n",data_length);
  while(data_written < data_length &&
        ((write_count = write(socket_write_fd,buf+data_written,data_length-data_written)) > 0)) {
    data_written += write_count;
  }
  //printf("message sent\n");

  if(write_count==-1 && !(errno==EAGAIN || errno==EINTR))
    return false;

  // indicates disconnected socket since otherwise would get a EAGAIN
  if(write_count==0) {
    printf("write count was %d (errno=%d), stopping reads on socket (fd=%d)\n",write_count,errno,socket_write_fd);

    return false;
  }

  return true;
}

void *DefaultOutputProcessor(WaveServerConfig *config) {
  int socket_write_fd=config->server.socket_read_fd[OutSocketIdx];
  bool (*OutputCallback)(WaveServerConfig *)=config->outputCallback;

  printf("starting output test on socket fd %d\n",socket_write_fd);

  if(fcntl(socket_write_fd,F_SETFL,O_NONBLOCK)==-1) {
    perror("problem setting write socket to nonblocking");
  }

  struct pollfd poll_fds[1];
  poll_fds[0].fd = socket_write_fd;
  poll_fds[0].events = POLLOUT | POLLERR | POLLHUP | POLLNVAL;

  bool run;
  run = *config->Run;
  while(run) {
    //printf("polling output on write socket fd=%d\n",socket_write_fd);
    poll(poll_fds,sizeof(poll_fds)/sizeof(poll_fds[0]),0);
    if(poll_fds[0].revents & (POLLOUT)) {
      //printf("trying test send\n");
      if(OutputCallback!=NULL) {
        run = run && (*OutputCallback)(config);
      } else {
        // use pthread_cond_signal here
      }
    }
    if(poll_fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
      printf("client close detected\n");
      run = false;
    }

    usleep(100000);

    sem_wait(config->RunSemaphore);
    run = run && *config->Run;
    sem_post(config->RunSemaphore);
  }

  if(close(socket_write_fd)==-1) {
    perror("problem closing write socket");
  }

  printf("stopping output test on socket fd %d\n",socket_write_fd);

  pthread_exit(NULL);

  return NULL;
}

void *SocketServer(WaveServerConfig *config) {
  WaveServer *server=&config->server;

  server->init();

  sockaddr_in server_address[NumAcceptingSockets];

  protoent *protocol;
  int proto_num;

  protocol=getprotobyname("IP");
  proto_num = protocol->p_proto;

  for (int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++) {
    server->socket_accept_fd[socket_idx] = socket(PF_INET, SOCK_STREAM, proto_num);
    if(server->socket_accept_fd[socket_idx] == -1) {
      fprintf(stderr,"unable to create accepting socket %d\n",socket_idx);
      exit(5);
    }
    
    memset(&server_address[socket_idx],0,sizeof(server_address[socket_idx]));
    server_address[socket_idx].sin_family = AF_INET;
    server_address[socket_idx].sin_port   = htons(config->ports[socket_idx]);
    server_address[socket_idx].sin_addr.s_addr = htonl(INADDR_ANY);
    if(inet_aton(config->hosts[socket_idx], &(server_address[socket_idx].sin_addr))==0) {
      fprintf(stderr,"Call to inet_aton failed on socket %d on host '%s'\n",socket_idx,config->hosts[socket_idx]);
      exit(6);
    }
    
    int val=1;
    if(setsockopt(server->socket_accept_fd[socket_idx],SOL_SOCKET,SO_REUSEADDR,&val,sizeof(int))==-1) {
      fprintf(stderr,"Call to setsockopt REUSEADDR failed on socket %d on host '%s' port %d: %s\n",
              socket_idx,config->hosts[socket_idx],config->ports[socket_idx],strerror(errno));
    }

    if(bind(server->socket_accept_fd[socket_idx],(sockaddr *)&server_address[socket_idx],sizeof(server_address[socket_idx]))!=0) {
      fprintf(stderr,"Call to bind failed on socket %d on host '%s' port %d: %s\n",
              socket_idx,config->hosts[socket_idx],config->ports[socket_idx],strerror(errno));
      exit(8);
    }
    
    if(listen(server->socket_accept_fd[socket_idx],1)!=0) {
      fprintf(stderr,"listen failed on socket %d: %s\n",
              socket_idx,strerror(errno));
      exit(9);
    }
    
    if(fcntl(server->socket_accept_fd[socket_idx],F_SETFL,O_NONBLOCK)==-1) {
      fprintf(stderr,"problem setting accept socket %d to nonblocking: %s\n",
              socket_idx,strerror(errno));
    }
  }

  struct pollfd poll_fds[NumAcceptingSockets];
  for (int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++) {
    poll_fds[socket_idx].fd = server->socket_accept_fd[socket_idx];
    poll_fds[socket_idx].events  = POLLIN | POLLPRI;
    poll_fds[socket_idx].revents = 0;
  }

  bool connected[NumAcceptingSockets];
  for (int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++) {
    connected[socket_idx] = false;
  }

  bool run;
  sem_wait(config->RunSemaphore);
  run = *config->Run;
  sem_post(config->RunSemaphore);
  while(run) {
    sockaddr_in connecting_sock;
    socklen_t connecting_sock_len;
    connecting_sock_len = sizeof(connecting_sock);

    if(poll(poll_fds,sizeof(poll_fds)/sizeof(poll_fds[0]),0)>=0){
      for(int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++) {
        if(poll_fds[socket_idx].revents & (POLLIN | POLLPRI)) {
          printf("accepting a connection on socket %d\n",socket_idx);
          server->socket_read_fd[socket_idx]=accept(server->socket_accept_fd[socket_idx],(sockaddr *)&connecting_sock,&connecting_sock_len);
          if(server->socket_read_fd[socket_idx]==-1 && errno!=EAGAIN) {
            fprintf(stderr,"accept on in socket %d failed: %s",socket_idx,strerror(errno));
            run=false;
            continue;
          }
          if(server->socket_read_fd[socket_idx]==-1 && errno==EAGAIN) {
            continue;
          }
          
          connected[socket_idx] = true;
          
          if(socket_idx == InSocketIdx) {
            printf("pthread_create on socket processor\n");
            if((errno=pthread_create(&server->InputThread,NULL,(pthread_start)SocketProcessor,(void *)config))!=0) {
              perror("Problem creating socket input thread");
            }
            printf("phtread_create on socket processor done\n");
          }
          
          if(socket_idx == OutSocketIdx) {
            printf("pthread_create on output test\n");
            if(config->outputProcessor==NULL)
              config->outputProcessor = DefaultOutputProcessor;
            if((errno=pthread_create(&server->OutputThread,NULL,(pthread_start)config->outputProcessor,(void *)config))!=0) {
              perror("Problem creating output test thread");
            }
            printf("phtread_create on output test done\n");
          }
        }
      }
    }

    bool all_connected=true;
    for(int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++)
      all_connected = all_connected && connected[socket_idx];
    
    if(all_connected) {
      printf("pthread_join on input socket processor\n");
      if((errno=pthread_join(server->InputThread,NULL))!=0) {
        perror("Problem joining on socket input thread");
      }
      printf("pthread_join on socket processor done\n");
    
      printf("pthread_join on output test thread\n");
      if((errno=pthread_join(server->OutputThread,NULL))!=0) {
        perror("Problem joining on output test thread");
      }
      printf("pthread_join on output test thread done\n");
    
      // don't bother checking return since normally the socket processing thread 
      // will close the socket and this will give an error
      for(int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++)
        close(server->socket_read_fd[socket_idx]); 
      for(int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++)
        connected[socket_idx] = false;

      if(config->runOnce) {
        sem_wait(config->RunSemaphore);
        *config->Run = false;
        sem_post(config->RunSemaphore);
      }
    }

    usleep(10000);

    sem_wait(config->RunSemaphore);
    run = *config->Run;
    sem_post(config->RunSemaphore);
  }

  // don't bother checking return since normally the socket processing thread 
  // will close the socket and this will give an error
  for(int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++)
    close(server->socket_read_fd[socket_idx]); 
  for(int socket_idx=0; socket_idx<NumAcceptingSockets; socket_idx++) {
    if(close(server->socket_accept_fd[socket_idx])==-1) {
      fprintf(stderr,"problem closing accept socket %d: %s",socket_idx,strerror(errno));
    }
  }

  pthread_exit(NULL);

  return NULL;
}

//===========================================================================
//===========================================================================
//===========================================================================
//===========================================================================
//===========================================================================
//===========================================================================
// This version of the socket data system connects to the server running on
// the AIBO and listens to the returned data.

void *SocketServer2(WaveServerConfig *config) {

  WaveServer *server=&config->server;

  // Set the values of all of the socket_fds to -1
  server->init();

  sockaddr_in client_address;

  struct hostent* dog_hp = NULL;

  //=====================================================================
  // Set up the socket that listens to the dog
  
  if ((server->socket_read_fd[0]=socket(AF_INET,SOCK_STREAM,0)) <0) {
    perror("socket");
    exit(10);
  }

  if ((dog_hp = gethostbyname(config->hosts[0])) == NULL) {
    
    // This doesn't use errno... so perror probably won't work
    switch(h_errno) {
    case HOST_NOT_FOUND:
      fprintf(stderr,"The specified host is unknown.\n");
      break;
    case NO_ADDRESS:
      fprintf(stderr,"The requested name is valid but does not have an IP address.\n");
      break;
    case NO_RECOVERY:
      fprintf(stderr,"A non-recoverable name server error occurred.\n");
      break;
    case TRY_AGAIN:
      fprintf(stderr,"A temporary error occurred on an authoritative name server.  Try again later.\n");
      break;
    }
    exit(11);
  }
  memset((void*)&client_address, 
	 0, 
	 sizeof(client_address));
  memcpy(&client_address.sin_addr, 
	 dog_hp->h_addr, 
	 dog_hp->h_length);
  
  client_address.sin_family = AF_INET;
  client_address.sin_port = htons(config->ports[0]);

  //=====================================================================

  bool run;

  sem_wait(config->RunSemaphore);
  run = *config->Run;
  sem_post(config->RunSemaphore);

  while(run) {

    // First, check to see if the socket has connected.
    //   If not, try to connect and wait a while before trying again
    // 
    // When the socket has connected, start the thread

    fprintf(stderr,"Attempting to contact %s\n",
	    config->hosts[0]);
    while (connect(server->socket_read_fd[0], 
		   (struct sockaddr*) &client_address, 
		   sizeof(client_address)) < 0) {
      perror("connect");
      fprintf(stderr,"Could not connect to client\n");
      fprintf(stderr,"Sleeping for a second...\n");
      sleep(1);
    }
    
    printf("phtread_create on socket processor\n");
    if((errno=pthread_create(&server->InputThread,NULL,
			     (pthread_start)SocketProcessor,
			     (void *)config))!=0) {
      perror("Problem creating socket input thread");}
    printf("phtread_create on socket processor done\n");

    printf("pthread_join on input socket processor\n");
    if((errno=pthread_join(server->InputThread,NULL))!=0) {
      perror("Problem joining on socket input thread"); }
    printf("pthread_join on socket processor done\n");
    
    // don't bother checking return since normally the socket processing 
    // thread will close the socket and this will give an error
    close(server->socket_read_fd[0]); 

    if(config->runOnce) {
      sem_wait(config->RunSemaphore);
      *config->Run = false;
      sem_post(config->RunSemaphore);
    }

    usleep(10000);

    sem_wait(config->RunSemaphore);
    run = *config->Run;
    sem_post(config->RunSemaphore);
  }

  close(server->socket_read_fd[0]); 

  pthread_exit(NULL);

  return NULL;
}
