/* LICENSE:
  =========================================================================
    CMPack'04 Source Code Release for OPEN-R SDK 1.1.5-r2 for ERS7
    Copyright (C) 2004 Multirobot Lab [Project Head: Manuela Veloso]
    School of Computer Science, Carnegie Mellon University
    All rights reserved.
  ========================================================================= */

//
// Portions Copyright 2002 Sony Corporation 
//
// Permission to use, copy, modify, and redistribute this software for
// non-commercial use is hereby granted.
//
// This software is provided "as is" without warranty of any kind,
// either expressed or implied, including but not limited to the
// implied warranties of fitness for a particular purpose.
//

/** This is a converted version of the SPOut class to send data over
    the network. Unlike SPOut, no conversion of 8 bit data to 7 bit
    data is done as 8 bit data can travel over the network (unlike
    the way we do serial communications).

    How things work:

    Data automagically comes in the form of packets from our packet
    multiplexer. A call back routine (ProcessBuffer) is periodically
    called by the operating system. ProcessBuffer calls the
    sendPacket routine. sendPacket breaks the packet down into a byte
    stream and ends up (through subroutines) sending this bytestream
    via the sendByte function. So far this is all just like in the SPOut
    class, which this class was copied from.

    Here's where things differ: sendByte buffers each byte internally
    in the netBuffer array.  Data is actually sent over the network
    under two conditions:

    The observer has asserted its ready and when that message arrives
    there is junk in netBuffer.

    The observer has asserted its ready and ProcessBuffer is about to
    end. (we call the flush method here).

*/

#ifndef OPENR_STUBGEN
#define OPENR_STUBGEN
#endif

#include <iostream>

#include <OPENR/core_macro.h>
#include <OPENR/ObjcommTypes.h>

#include <OPENR/OPENR.h>
#include <OPENR/OPENRMessages.h>
#include <OPENR/OPENRAPI.h>

#include "../headers/AperiosMessageStructures.h"
#include "../headers/CircBufPacket.h"
#include "../headers/Config.h"
#include "../headers/Dictionary.h"
#include "../headers/DogTypes.h"
#include "../headers/MessageStructures.hh"
#include "../headers/PacketMux.h"
#include "../headers/Reporting.h"
#include "../headers/SharedMem.h"
#include "../headers/WaveLAN.h"
#include "../headers/SPOutEncoder.h"
#include "../headers/Utility.h"

#include "../headers/SystemUtility.h"
#include "WLOut.h"


#include <string.h>
#include <OPENR/OSyslog.h>
#include <OPENR/OPENRAPI.h>
#include <ant.h>
#include <EndpointTypes.h>
#include <TCPEndpointMsg.h>
#include "entry.h"

bool listen_done = false;

//*************************************************************

PacketStream *TextOutputStream = NULL;

static const bool debug = true;
int OutputOn = 1;
static const ulong MaxPacketLength=100*1024;

static const int WLOutputDataSize = 64*1024; // 64KB
static const int TotalTextOutput = 32*1024; // 32KB
static const int MaxTextMessages = 256; // maximum number of in flight text messages

WLOut::WLOut() 
{
  int id;

  timerStarted=false;
  needToSendBuffer=false;
  baseAddr=NULL;
  
  mainOutPSC = NULL;
  motionOutPSC = NULL;
  loggerOutPSC = NULL;
  mux = new PacketMuxRoundRobin();

  for(int i=0; i<MemoryId::NUM_MEMORY_IDS; i++)
    requestedRegion[i]=false;

  quickCheck = true;
  gotMessage = true;

  netBufIndex = NetworkBufferDataStart;
  numBufs = 0;
  for(int i=0; i<MaxDataBufs; i++) {
    dataBuf   [i] = NULL;
    dataLength[i] = 0;
  }

  sError result;
  result = NewRegion(NetworkBufferSize, reinterpret_cast<void**>(&netBuffer)); 
  if (result != sSUCCESS) { 
    netBuffer = static_cast<uchar*>(NULL); 
    cout << "Unable to allocate net buffer of size " << NetworkBufferSize << endl;
  }
  
  /* Load our sender ID from file */
  Dictionary *Config = new Dictionary();

  char design[256];
  OPENR::GetRobotDesign(design);

  if(strcmp(design,"ERS-210")==0){
    Config->read("/MS/config/wavelan.cfg");
  }else{
    Config->read("/MS/config/wavelan.cfg");
  }

  if(!Config->getValueInt("SenderID", id))
    id = 1;
  Config->getValueInt("OutputOn", OutputOn);
  SenderID = (uchar)id;
  delete Config;
}


OStatus
WLOut::DoInit(const OSystemEvent& event)
{
  //cout << "--- WLOut::DoInit()" << endl;
  //cout << "    invoked." << endl;
  
  config.init();
  config.config("/MS/config/config.cfg");

  //
  // Initialize myself
  //
  NewComData; 
  SetComData;
 
  // make sure the library doesn't drop data "for" us
  //   on this reliable communication channel
  observer[obsSharedMemRegionInfo]->SetBufCtrlParam(0,1,16);
  
  return oSUCCESS;
}


OStatus
WLOut::DoStart(const OSystemEvent& event)
{
  //cout << "--- WLOut::DoStart()" << endl;
  //cout << "    invoked." << endl;
  
  void *base_addr=NULL;
  outTextRgn.init(sizeof(PacketStreamCollection)+WLOutputDataSize);
  base_addr = outTextRgn.getData();
  
  PacketStreamCollection *out_psc;
  uchar *start_address;

  out_psc=(PacketStreamCollection *)base_addr;
  start_address = ((uchar *)base_addr) + sizeof(PacketStreamCollection); 
  out_psc->init(start_address, start_address + WLOutputDataSize);

  int text_output_stream_id;
  text_output_stream_id = out_psc->allocateStream(TotalTextOutput,MaxTextMessages);
  TextOutputStream = out_psc->getStream(text_output_stream_id);
  TextOutputStream->type   = SPOutEncoder::TextLead;
  TextOutputStream->binary = false;

  needToSendBuffer=true;
  sendOutputBuffer();

  for (int i=0; i<numOfObserver; ++i){
    if ( observer[i]->AssertReady() != oSUCCESS ){
      cout << "\tWOARF{" << i << "}" << endl; // WLOut assert ready failed
    }
  }   
  
  requestRegions();
  
  // network setup
  ipstackRef = antStackRef("IPStack");
  
  for (int index = 0; index < ECHOSERVER_CONNECTION_MAX; index++) {
    OStatus result = InitTCPConnection(index);
    if (result != oSUCCESS) return oFAIL;
  }
  
  Listen(0);

  return oSUCCESS;
}    

OStatus
WLOut::DoStop(const OSystemEvent& event)
{
  //cout << "--- WLOut::DoStop()" << endl;
  //cout << "    invoked." << endl;

  for (int i=0; i<numOfObserver; ++i){
    observer[i]->DeassertReady();
  }

  return oSUCCESS;
}


OStatus
WLOut::DoDestroy(const OSystemEvent& event)
{
  //cout << "--- WLOut::DoDestroy()" << endl;
  //cout << "    invoked." << endl;

  //
  // Destroy myself
  //
  DeleteComData;

  return oSUCCESS;
}

void
WLOut::ReadyRegisterRegion(const OReadyEvent &event) {
  gotMessage = true;
}

void
WLOut::sendOutputBuffer() {
  //cout << "SharedMem::SendMemBlockID() invoked: memID " << regionId << "." << endl;
  OSubject* thisSubj = subject[sbjRegisterRegion];

  if(outTextRgn.validData() && needToSendBuffer) {
    needToSendBuffer=false;
    outTextRgn.setId(MemoryId::WLOutId);
    thisSubj->SetData(outTextRgn.getMsgForm());
    thisSubj->NotifyObservers();
    //cout << "SharedMemID sent" << endl;
  }
}

void
WLOut::requestRegions() {
  const int main_out_id  =MemoryId::MainOutId;
  const int motion_out_id=MemoryId::MotionOutId;
  const int logger_out_id=MemoryId::LoggerOutId;

  if(!mainOutMemRgn.validData() && !requestedRegion[main_out_id]) {
    requestedRegion[main_out_id]=true;
    RequestInfo request_info;
    request_info.region_id=main_out_id;
    request_info.obs_id=observer[obsSharedMemRegionInfo]->GetObserverID();
    subject[sbjRequestRegion]->SetData(&request_info,sizeof(RequestInfo));
    subject[sbjRequestRegion]->NotifyObservers();
  }

  if(!motionOutMemRgn.validData() && !requestedRegion[motion_out_id]) {
    requestedRegion[motion_out_id]=true;
    RequestInfo request_info;
    request_info.region_id=motion_out_id;
    request_info.obs_id=observer[obsSharedMemRegionInfo]->GetObserverID();
    subject[sbjRequestRegion]->SetData(&request_info,sizeof(RequestInfo));
    subject[sbjRequestRegion]->NotifyObservers();
  }

  if(!loggerOutMemRgn.validData() && !requestedRegion[logger_out_id]) {
    requestedRegion[logger_out_id]=true;
    RequestInfo request_info;
    request_info.region_id=logger_out_id;
    request_info.obs_id=observer[obsSharedMemRegionInfo]->GetObserverID();
    subject[sbjRequestRegion]->SetData(&request_info,sizeof(RequestInfo));
    subject[sbjRequestRegion]->NotifyObservers();
  }
}

void
WLOut::ReadyRequestRegion(const OReadyEvent &event) {
  gotMessage = true;
}

void
WLOut::GotMemRegion(const ONotifyEvent &event) {
  gotMessage = true;

  int event_num_data = event.NumOfData();
  for(int event_data_id=0; event_data_id<event_num_data; event_data_id++) {
    SMMSharedMemRegion region_info;
    region_info.init((SMMSharedMemRegion::MsgForm)event.RCData(event_data_id));
    switch(region_info.getId()) {
    case MemoryId::MainOutId:
      {
        mainOutMemRgn=region_info;
        
        mainOutPSC = (PacketStreamCollection *)mainOutMemRgn.getData();
        mux->addPSC(mainOutPSC);
      }
    break;

    case MemoryId::MotionOutId:
      {
        motionOutMemRgn=region_info;
        
        motionOutPSC = (PacketStreamCollection *)motionOutMemRgn.getData();
        mux->addPSC(motionOutPSC);
      }
    break;

    case MemoryId::LoggerOutId:
      {
        loggerOutMemRgn=region_info;
        
        loggerOutPSC = (PacketStreamCollection *)loggerOutMemRgn.getData();
        mux->addPSC(loggerOutPSC);
      }
    break;
    }
  }

  observer[obsSharedMemRegionInfo]->AssertReady();
}

/* Send the buffered data over the wireless network
   in the form of data on a TCP stream.
*/
void WLOut::NetTransmit(const OReadyEvent &event) 
{
  if(debug) pprintf(TextOutputStream,"WL:got net ready\n");

  // sendStringData("In NetTransmit.\n");
  processBuffer();
}

void
WLOut::ProcessBuffer(void *msg) {
  if(debug) pprintf(TextOutputStream,"WL:got timer msg\n");

  //cout << "processing buffer" << endl;
  timerStarted=false;

  proc();
}

void
WLOut::processBuffer() {
  //state += 0x1000000;

  if(OutputOn) {
    static EventTimeReporter reporter("WLOut::ProcessBuffer",SecToTime(5.0),SecToTime(100.0),100UL,&TextOutputStream);
    EventTimeReporter::EventTimer timer(&reporter,config.spoutConfig.dumpProfile);

    // sendStringData("   Output is on.\n");
    static RobotDataPacket packet;
    static bool initialized=false;

    if(sendBuffers()==true)
      return;
    
    if(!initialized) {
#ifdef PLATFORM_APERIOS
      NewRegion(sizeof(uchar)*MaxPacketLength, reinterpret_cast<void**>(&packet.data)); 
#else
      packet.data = new uchar[MaxPacketLength];
#endif
      packet.contentType = RobotDataPacket::EMPTY;
      initialized = true;
    }
    
    quickCheck = gotMessage;
    gotMessage = false;
    
    ulong start_time = GetTime();
    bool cont=true;
    ulong check_cutoff = quickCheck ? 1*1000000 : 5*1000000;
    //state += 0x100;
    while(cont && mux->getNextPacket(&packet,MaxPacketLength)) {
      static EventTimeReporter reporter("WLOut process packet",SecToTime(5.0),SecToTime(100.0),100000UL,&TextOutputStream);
      EventTimeReporter::EventTimer timer(&reporter,config.spoutConfig.dumpProfile);

      // sendStringData("  Sending packet.\n");
      if(sendPacket(&packet)==true)
        return;
      
      cont = (GetTime() - start_time <= check_cutoff);
      //cont = false;
    } // end while loop
    
  } // end if OutputOn
  
  // we're done putting packets in the buffer, so try
  // to send them over the network.
  if(flush()==true)
    return;
  
  startTimer();
}

void
WLOut::startTimer() {
  if(timerStarted)
    return;
  timerStarted=true;
  
  int dummy;

  EventID eid;
  RelativeTime period(0,100);
  TimeEventInfoWithRelativeTime time_info(TimeEventInfo::NonPeriodic, period);

  sError error;
  error=SetTimeEvent(&time_info,myOID_,Extra_Entry[entryProcessBuffer],&dummy,0,&eid);
  if(error!=sSUCCESS) {
    cout << "error :" << error << ":" << endl;
  }
}

/* Sends the packet header information via sendByte.
   On the wire headers look like:
   0x80, dataType, 4 bytes length, 4 bytes bytes of timestamp
   in host order.

   FIXME: We need to think about byte ordering. Since data can end up on
   arbitrary architectures after it goes over the network, trusting that
   our byte ordering is the same as theirs is going to be Bad.  On the other
   hand this works for all the systems we currently use except the Mac and
   fixing this requires us to know about the internal structure of the data.
   We would have to fix this in the encoding and decoding steps and not in
   the sending step.
*/
void
WLOut::bufferHeader(RobotDataPacket *packet) {
  uint head_buf_idx=0;
  headBuffer[head_buf_idx++]=(uchar)'\x80';
  headBuffer[head_buf_idx++]=packet->dataType;

  memcpy(&headBuffer[head_buf_idx],&packet->length,sizeof(packet->length));
  head_buf_idx+=sizeof(packet->length);

  memcpy(&headBuffer[head_buf_idx],&packet->timestamp,sizeof(packet->timestamp));
  head_buf_idx+=sizeof(packet->timestamp);

  bufferData(headBuffer,head_buf_idx);
}

void WLOut::bufferData(uchar *data, ulong length)
{
  if(numBufs == MaxDataBufs) {
    // disaster, try to muddle through without crashing
    numBufs--;
  }

  dataBuf   [numBufs] = data;
  dataLength[numBufs] = length;
  numBufs++;
  //state &= ~0xF0000000UL; state += numBufs*0x10000000UL;
}

bool
WLOut::sendPacket(RobotDataPacket *packet) {
  bufferHeader(packet);
  
  switch(packet->contentType) {

  case RobotDataPacket::TEXT:
  case RobotDataPacket::BINARY:
    bufferData(packet->data,packet->length);
    break;
    
  default:
    // EMPTY buffer!
    // shouldn't get here
    break;
  }

  return sendBuffers();
}

bool WLOut::sendNetBuffer() {
  /* Prepare header. */
  NetMsgHeader header;
  header.totalLength       = netBufIndex;
  header.senderID          = SenderID;
  header.senderSubsystem   = NetMsgHeader::Sub_WLO;
  header.receiverID        = NetMsgHeader::Linux;
  header.receiverSubsystem = NetMsgHeader::Sub_Main;
  header.messageType       = NetMsgHeader::WLOStream;

  if(netBufIndex <= NetworkBufferDataStart)
    return false;
  
  memcpy(netBuffer, &header, sizeof(header));
  
  if(debug) pprintf(TextOutputStream,"WL:sending %d bytes data\n",header.totalLength);

  // send data over network
  memcpy((unsigned char*)connection[0].sendData, netBuffer, header.totalLength);
  connection[0].sendSize = header.totalLength;
  Send(0);
  
  netBufIndex = NetworkBufferDataStart;

  return true;
}

bool WLOut::sendBuffers() 
{
  if(debug) pprintf(TextOutputStream,"WL:send buffers\n");
  while(true) {
    if(netBufIndex == NetworkBufferSize) {
      if(debug) pprintf(TextOutputStream,"WL:full buffer, sending data\n");
      
      if(sendNetBuffer())
	break;
      
      //state = 0;
    } else {
      if(numBufs != 0) {
        ulong copy_size;
        
        copy_size = min(dataLength[0], NetworkBufferSize - netBufIndex);
        memcpy(&netBuffer[netBufIndex],dataBuf[0],copy_size);
        netBufIndex += copy_size;
        dataBuf   [0] += copy_size;
        dataLength[0] -= copy_size;
        if(dataLength[0] == 0) {
          memmove(&dataLength[0],&dataLength[1],(MaxDataBufs-1)*sizeof(ulong  ));
          memmove(&dataBuf   [0],&dataBuf   [1],(MaxDataBufs-1)*sizeof(uchar *));
          dataBuf   [numBufs] = NULL;
          dataLength[numBufs] = 0;
          numBufs--;
          //state &= ~0xF0000000UL; state += numBufs*0x10000000UL;
        }
        
        if(debug) pprintf(TextOutputStream,"WL:copied %lu bytes data, %d bufs outstanding\n",copy_size,numBufs);
        
        //state += 1;
      } else {
        //state += 0x10000;
        if(debug) pprintf(TextOutputStream,"WL:sb, no more data, returning\n");
        
        return false;
      }
    }
  }

  if(debug) pprintf(TextOutputStream,"WL:sb waiting for net ready\n");
      
  return true;
}

bool WLOut::flush()
{
  if(sendBuffers()==true)
    return true;

  if(debug) pprintf(TextOutputStream,"WL:flushing\n");
  return sendNetBuffer();

  if(debug) pprintf(TextOutputStream,"WL:flush waiting for net ready\n");
      
  return true;
}

//*****************************************
// derived from ECHOSERVER CODE:

OStatus
WLOut::Listen(int index)
{
  OSYSDEBUG(("WLOut::Listen()\n"));
  
  if(connection[index].state != CONNECTION_CLOSED)
    return oFAIL;
  
  //
  // Create endpoint
  //
  antEnvCreateEndpointMsg tcpCreateMsg(EndpointType_TCP,
                                       ECHOSERVER_BUFFER_SIZE * 2);
  tcpCreateMsg.Call(ipstackRef, sizeof(tcpCreateMsg));
  if(tcpCreateMsg.error != ANT_SUCCESS){
    OSYSLOG1((osyslogERROR, "%s : %s[%d] antError %d",
              "EchoServer::Listen()",
              "Can't create endpoint",
              index, tcpCreateMsg.error));
    return oFAIL;
  }
  connection[index].endpoint = tcpCreateMsg.moduleRef;
  
  //
  // Listen
  //
  TCPEndpointListenMsg listenMsg(connection[index].endpoint,
                                 IP_ADDR_ANY, ECHOSERVER_PORT);
  listenMsg.continuation = (void*)index;
  
  listenMsg.Send(ipstackRef, myOID_,
                 Extra_Entry[entryListenCont], sizeof(listenMsg));
  
  connection[index].state = CONNECTION_LISTENING;
  
  return oSUCCESS;
}

void
WLOut::ListenCont(void* msg)
{
  OSYSDEBUG(("EchoServer::ListenCont()\n"));
  
  TCPEndpointListenMsg* listenMsg = (TCPEndpointListenMsg*)msg;
  int index = (int)listenMsg->continuation;
  
  if (listenMsg->error != TCP_SUCCESS) {
    OSYSLOG1((osyslogERROR, "%s : %s %d",
              "EchoServer::ListenCont()",
              "FAILED. listenMsg->error", listenMsg->error));
    Close(index);
    return;
  }
  
  connection[index].state = CONNECTION_CONNECTED;
  listen_done = true;
  
  proc();
}

OStatus
WLOut::Send(int index)
{
  OSYSDEBUG(("EchoServer::Send()\n"));
  
  if(connection[index].sendSize == 0 ||
     connection[index].state != CONNECTION_CONNECTED)
    return oFAIL;
  
  TCPEndpointSendMsg sendMsg(connection[index].endpoint,
                             connection[index].sendData,
                             connection[index].sendSize);
  sendMsg.continuation = (void*)index;

  sendMsg.Send(ipstackRef, myOID_,
               Extra_Entry[entrySendCont],
               sizeof(TCPEndpointSendMsg));

  connection[index].state = CONNECTION_SENDING;
  connection[index].sendSize = 0;
    
  return oSUCCESS;
}

void
WLOut::SendCont(void* msg)
{
  OSYSDEBUG(("EchoServer::SendCont()\n"));

  TCPEndpointSendMsg* sendMsg = (TCPEndpointSendMsg*)msg;
  int index = (int)(sendMsg->continuation);

  if(sendMsg->error != TCP_SUCCESS){
    OSYSLOG1((osyslogERROR, "%s : %s %d",
              "EchoServer::SendCont()",
              "FAILED. sendMsg->error", sendMsg->error));
    Close(index);
    return;
  }

  OSYSPRINT(("sendData : %s", connection[index].sendData));
    
  connection[index].state = CONNECTION_CONNECTED;

  if(listen_done)
    processBuffer();
}

OStatus
WLOut::Receive(int index)
{
  OSYSDEBUG(("EchoServer::Receive()\n"));

  if(connection[index].state != CONNECTION_CONNECTED &&
     connection[index].state != CONNECTION_SENDING)
    return oFAIL;

  TCPEndpointReceiveMsg receiveMsg(connection[index].endpoint,
                                   connection[index].recvData,
                                   1, connection[index].recvSize);
  receiveMsg.continuation = (void*)index;

  receiveMsg.Send(ipstackRef, myOID_,
                  Extra_Entry[entryReceiveCont], sizeof(receiveMsg));

  return oSUCCESS;
}

void WLOut:: proc()
{
  if(!listen_done) return;

  timerStarted=false;
  processBuffer();
}

void
WLOut::ReceiveCont(void* msg)
{
  listen_done = true;
}

OStatus
WLOut::Close(int index)
{
  OSYSDEBUG(("EchoServer::Close()\n"));

  if(connection[index].state == CONNECTION_CLOSED ||
     connection[index].state == CONNECTION_CLOSING)
    return oFAIL;

  TCPEndpointCloseMsg closeMsg(connection[index].endpoint);
  closeMsg.continuation = (void*)index;

  closeMsg.Send(ipstackRef, myOID_,
                Extra_Entry[entryCloseCont], sizeof(closeMsg));

  connection[index].state = CONNECTION_CLOSING;

  // If we've just closed our connection, we need to listen
  // for a new one.
  listen_done = false;

  return oSUCCESS;
}

void
WLOut::CloseCont(void* msg)
{
  OSYSDEBUG(("EchoServer::CloseCont()\n"));
    
  TCPEndpointCloseMsg* closeMsg = (TCPEndpointCloseMsg*)msg;
  int index = (int)(closeMsg->continuation);

  connection[index].state = CONNECTION_CLOSED;
  Listen(index);
}

OStatus
WLOut::InitTCPConnection(int index)
{
  OSYSDEBUG(("EchoServer::InitTCPConnection()\n"));

  connection[index].state = CONNECTION_CLOSED;

  // 
  // Allocate send buffer
  //
  antEnvCreateSharedBufferMsg sendBufferMsg(ECHOSERVER_BUFFER_SIZE);

  sendBufferMsg.Call(ipstackRef, sizeof(sendBufferMsg));
  if(sendBufferMsg.error != ANT_SUCCESS){
    OSYSLOG1((osyslogERROR, "%s : %s[%d] antError %d",
              "EchoServer::InitTCPConnection()",
              "Can't allocate send buffer",
              index, sendBufferMsg.error));
    return oFAIL;
  }

  connection[index].sendBuffer = sendBufferMsg.buffer;
  connection[index].sendBuffer.Map();
  connection[index].sendData = (byte*)(connection[index].sendBuffer.GetAddress());

  //
  // Allocate receive buffer
  //
  antEnvCreateSharedBufferMsg recvBufferMsg(ECHOSERVER_BUFFER_SIZE);

  recvBufferMsg.Call(ipstackRef, sizeof(recvBufferMsg));
  if(recvBufferMsg.error != ANT_SUCCESS){
    OSYSLOG1((osyslogERROR, "%s : %s[%d] antError %d",
              "EchoServer::InitTCPConnection()",
              "Can't allocate receive buffer",
              index, recvBufferMsg.error));
    return oFAIL;
  }

  connection[index].recvBuffer = recvBufferMsg.buffer;
  connection[index].recvBuffer.Map();
  connection[index].recvData = (byte*)(connection[index].recvBuffer.GetAddress());

  return oSUCCESS;
}

