/* LICENSE:
  =========================================================================
    CMPack'04 Source Code Release for OPEN-R SDK 1.1.5-r2 for ERS7
    Copyright (C) 2004 Multirobot Lab [Project Head: Manuela Veloso]
    School of Computer Science, Carnegie Mellon University
    All rights reserved.
  ========================================================================= */
// FIXME: use struct timeval to prevent wraparound issues, or else be
// intelligent about it

#include <OPENR/core_macro.h>
#include <iostream>
#include <ERA201D1.h>

#include "CommMgrObject.h"
#include "Util.h"
#include "../headers/CircBufPacket.h"
#include "../headers/SystemUtility.h"
#include "../headers/WaveLAN.h"

using namespace std;
using namespace CommMgr;

static const int CommMgrOutputDataSize = 1024*1024; // 1MB
static const int TotalTextOutput = 64*1024; // 64KB
static const int MaxTextMessages = 256; // maximum number of in flight text messages
PacketStream *TextOutputStream = NULL;

static const bool print_debug = true;

void lame_o_print(char *msg, ulong data) {
  static int counter = 0;

  //  if(!(counter % 1)) {
    FILE *outfile = fopen("/MS/commmgr.txt", "at");
    fprintf(outfile, "%s: %lu\n", msg, data);
    fclose(outfile);
    // }

  counter++;
}

void debug(const char* msg) {
  pprintf(TextOutputStream, "%s\n", msg);
}

void debug(string msg) {
  debug(msg.c_str());
}

void debugtime(const char* msg) {
  char out[1024];
  unsigned long t = GetTime();
  const char* msg2 = timetostring(t).c_str();
  sprintf(out, "%s: %s", msg, msg2);
  debug(out);
}

OStatus
CommMgrObject::DoInit(const OSystemEvent& event) 
{

  NEW_ALL_SUBJECT_AND_OBSERVER;
  REGISTER_ALL_ENTRY;
  SET_ALL_READY_AND_NOTIFY_ENTRY;
  
  // make sure the library doesn't drop data "for" us
  // on this reliable communication channel
  observer[obsSendMessage]->SetBufCtrlParam(0,1,16);
  observer[obsBCastMessage]->SetBufCtrlParam(0,1,16);

  return oSUCCESS;
}

OStatus
CommMgrObject::DoStart(const OSystemEvent& event)
{

  ENABLE_ALL_SUBJECT;
  ASSERT_READY_TO_ALL_OBSERVER;

  // allocate the memory and allow attachement
  void *base_addr=NULL;
  outMemRgn.init(sizeof(PacketStreamCollection)+CommMgrOutputDataSize);
  base_addr = outMemRgn.getData();

  PacketStreamCollection *out_psc;
  uchar *start_address;

  out_psc=(PacketStreamCollection *)base_addr;
  start_address = ((uchar *)base_addr) + sizeof(PacketStreamCollection); 
  out_psc->init(start_address, start_address + CommMgrOutputDataSize);

  int text_output_stream_id;
  text_output_stream_id = 
    out_psc->allocateStream(TotalTextOutput,MaxTextMessages);
  TextOutputStream = out_psc->getStream(text_output_stream_id);
  TextOutputStream->type   = SPOutEncoder::TextLead;
  TextOutputStream->binary = false;

  init();

  startTimer();
  
  return oSUCCESS;
}

OStatus
CommMgrObject::DoStop(const OSystemEvent& event)
{
  
  DISABLE_ALL_SUBJECT;
  DEASSERT_READY_TO_ALL_OBSERVER;
  
  return oSUCCESS;
}

OStatus
CommMgrObject::DoDestroy(const OSystemEvent& event)
{
  DELETE_ALL_SUBJECT_AND_OBSERVER;
  return oSUCCESS;
}

void CommMgrObject::receivedData(int ap_index, byte *data, int bytes) {

 
  if (bytes == 0) {
    //char msg[64];
    //sprintf(msg, "Error in Receive: error code %d\n", conn->Error);
    //debug(msg);
    if(print_debug)
      debug("Hey, receiveData called with empty buffer. ::frown::\n");
    return;
  }

  if(ap_index < 0 ||
     ap_index > (int)all_peers.size()) {
    if(print_debug)
      debug("Message from unknown peer");
    return;
  }

  // Otherwise get the peer out of the vector
  Peer *peer = &all_peers[ap_index];

  data[bytes] = 0;
  char *msg = (char *)data;

  if (strstr(msg, "sync") != msg) {
    // OK, this is presumably a normal message, send it along to
    // whoever's listening

    // FIXME: how do we check that this is, in fact, a NetMsgHeader
    // and not something totally arbitrary that will crash us?
    NetMsgHeader *header = (NetMsgHeader*) msg;

    if(bytes < header->totalLength) {
      if(print_debug)
	debug("NetMsgHeader says it contains more data than came in over net");
    }

    header->timestamp = convertfrom(peer, header->timestamp);

    // If we think that the message originated in the future due to
    // variation in the connection latency, we clip to the present.
    // We are replacing a larger error with a smaller one ('cuz we
    // know the latency was > 0) 
    if(header->timestamp > GetTime())
      header->timestamp = GetTime();

    subject[sbjGotMessage]->SetData(msg, header->totalLength);
    subject[sbjGotMessage]->NotifyObservers();
    return;
  }

  unsigned long local = GetTime();

  // if this is a sync reply packet...
  if (strstr(msg, "syncreply") == msg) {
    unsigned long seqnum;
    if (sscanf(msg, "syncreply %lu\n", &seqnum) != 1) {
      if (print_debug) {
        debug("Error parsing syncreply packet");
      }
      return;
    }

    // Try to find it in the array
    bool foundMatch = false;
    for (int i = 0; i < SEQTIMEHISTORYLEN; i++) {
      SeqTime s = send_times[i];
      if (s.seq == seqnum) {
        ulong roundtrip_delay = local - s.time;
        peer->onewayLatency = roundtrip_delay / 2;

        //FILE* fp = fopen("/MS/synch.log", "a");
        //fprintf(fp, "oneway latency: %u\n", peer->onewayLatency);
        //fclose(fp);

        foundMatch = true;
        break;
      }
    }
    // either we got a corrupt packet or REALLY bad latency (on the
    // order of SEQTIMEHISTORYLEN seconds). Not sure what to do here,
    // just logging it for the moment.
    if (!foundMatch) {
      pprintf(TextOutputStream, 
              "Didn't find syncreply in send_times: %s\n", msg);
    }
    return;
  }
  

  // otherwise, this is a sync packet

  // Get remote timestamp and sequence number, as ulongs
  unsigned long seqnum;
  unsigned long remote;
  if (sscanf(msg, "sync %lu %lu\n", &seqnum, &remote) != 2) {
    if(print_debug)
      debug("Error parsing sync packet");
    return;
  }

  // send a response back to that peer. this is done right after
  // getting the time, so as to minimize round-trip time. 
  {
    char msg[256];
    snprintf(msg, 256, "syncreply %lu\n", seqnum);    
    send(peer->rank, msg, strlen(msg));
  }
  

  // Record this as a last-received time from our peer
  peer->last_recv = local;

  // If this is the first time we've ever received a synch packet from
  // this peer, keep track of our local time and the remote time (this
  // is only used for debugging/testing/performance analysis
  // purposes)
  if (peer->initialLocalTime == 0) {
    peer->initialLocalTime = local;
    peer->initialTime = remote;
  }

  // We assume that the latency in receiving this message is equal to
  // our current one-way latency estimate. So we want to add the
  // latency to the remote time, because that's approximately what
  // time the other robot's clock *currently* reads.
  remote += peer->onewayLatency;

  // Actually calculate our new offset estimate
  long diff = remote - local;
  peer->offsets->enqueue(diff);
  peer->offsetEstimate = mean(peer->offsets);

  // Calculate current error in milliseconds between the time we would
  // have sent to the sender and the time as reported by the sender
  unsigned long converted = convertto(peer, local);
  long error;
  bool errorPositive;
  if (converted > remote) {
    error = (converted - remote) / 1000;
    errorPositive = true;
  }
  else {
    error = (remote - converted) / 1000;
    errorPositive = false;
  }
  peer->error = error;
  
  // Update status of this peer based on absolute error
  long abserror = labs(error);
  switch (peer->status) {
  case Peer::OFF:
  case Peer::INITIAL:
    peer->status = Peer::GOOD;
    break;
  case Peer::DISCONNECTED:
    peer->status = Peer::POOR;
    break;
  case Peer::POOR:
    if (abserror > SIGMA) {
      peer->goodWait = GOOD_HYSTERESIS;
    }
    else {
      if (peer->goodWait == 0) {
        peer->status = Peer::GOOD;
      }
      else {
        peer->goodWait--;
      }
    }
    break;
  case Peer::GOOD:
    if (abserror > SIGMA) {
      peer->goodWait = GOOD_HYSTERESIS;
      peer->status = Peer::POOR;
    }
    break;
  }

  // Send peer status data to observers
  PeerStatus ps; 
  ps.peerID = peer->id;
  ps.status = getPeerStatus(peer);
  subject[sbjPeerStatus]->SetData(&ps,sizeof(PeerStatus));
  subject[sbjPeerStatus]->NotifyObservers();

  // Send net status data to observers
  NetStatus ns = getNetStatus();
  subject[sbjNetStatus]->SetData(&ns, sizeof(NetStatus));
  subject[sbjNetStatus]->NotifyObservers();

  // Error we would have encountered if we had only synched once at
  // the beginning of the run - used for debugging/analysis
  /*  
  long default_error = 
    ((local - peer->initialLocalTime) - 
    (remote - peer->initialTime)) / 1000;
  
  // Log some information for purposes of debugging/analysis...
  // offset is in seconds, errors in ms

  char mesg[1024];
  snprintf(mesg,
           1024,
           "%s local=%ld remote=%ld diff=%ld converted=%ld offset=%f error=%ld default_error=%ld\n", 
           peer->addrstring.c_str(),
           local,
           remote,
           diff,
           converted, 
           peer->offsetEstimate / 1e6,
           error,
           default_error);
  debug(mesg);
  */
}

// Returns DISCONNECTED if any peer is disconnected
// Otherwise returns POOR if connection to any node is poor
// Otherwise returns GOOD
CommMgr::NetStatus CommMgrObject::getNetStatus() {

  for (unsigned int i = 0; i < all_peers.size(); i++) {
    if (all_peers[i].status == Peer::DISCONNECTED) {
      return CommMgr::DISCONNECTED;
    } else if (all_peers[i].status == Peer::POOR) {
      return CommMgr::POOR;
    }
  }

  return CommMgr::GOOD;
}

CommMgr::NetStatus CommMgrObject::getPeerStatus(int id) {
  Peer* p = getPeerByID(id);
  return getPeerStatus(p);
}

CommMgr::NetStatus CommMgrObject::getPeerStatus(Peer *p) {
  if (p == NULL) {
    return CommMgr::DISCONNECTED;
  }

  switch (p->status) {
  case Peer::POOR:
    return CommMgr::POOR;
    break;
  case Peer::GOOD:
    return CommMgr::GOOD;
    break;
  default:
    return CommMgr::DISCONNECTED;
  }
}

CommMgrObject::CommMgrObject() {
  next_seq_num = 0;

  send_times = new SeqTime[SEQTIMEHISTORYLEN];
}

CommMgrObject::~CommMgrObject() {
  delete[] send_times;
  send_times = NULL;
}

Peer* CommMgrObject::getPeerByID(int id) {
  int rank = idToRankMap[id];
  if (rank == -1) {
    pprintf(TextOutputStream, 
            "CommMgrObject warning: trying to do something with a robot with unknown ID %d\n",
            id);
    return NULL;
  }
  return &all_peers[rank];
}

void CommMgrObject::sendSynchPacket(int ap_index, const char* msg) {
  
  send(ap_index, msg, strlen(msg));
}

void CommMgrObject::SyncTimer(void* unused) {
  static int heartbeats = 0;
  unsigned long t = GetTime();
  
  // FIXME: should we use a real NetMsgHeader?
  char msg[256];
  snprintf(msg, 256, "sync %u %s\n", next_seq_num, timetostring(t).c_str());

  // record when we sent this packet - used for determining round-trip
  // time when we get a reply
  send_times[next_seq_num % SEQTIMEHISTORYLEN].seq = next_seq_num;
  send_times[next_seq_num % SEQTIMEHISTORYLEN].time = t;

  next_seq_num++;

  if(print_debug)
    debug(msg);
  
  for (unsigned int i = 0; i < all_peers.size(); i++) {
    Peer* p = &all_peers[i];

    // don't bother synching with self
    if (p->id == my_id) {
      continue;
    }

    // Set this peer's state to "disconnected" if we haven't heard
    // from it in a while
    if ((p->status == Peer::GOOD
         || p->status == Peer::POOR)
        && t > p->last_recv + DISCONNECT_TIME * 1000000) {
      p->status = Peer::DISCONNECTED;
    }

    switch (p->status) {
      case Peer::INITIAL:
	// assume the robot is off until we actually hear from it
	p->status = Peer::OFF;
	// falling through on purpose...
      case Peer::POOR:
      case Peer::GOOD:
	sendSynchPacket(i, msg);      
	break;
      case Peer::DISCONNECTED:
      case Peer::OFF:
	if (rand() % 10 == 0) {
	  sendSynchPacket(i, msg);
	}
	break;
    }
  }
  
  heartbeats++;
}

void CommMgrObject::startTimer() {
  int dummy;

  EventID eid;
  RelativeTime period(1,0);   // seconds, ms
  TimeEventInfoWithRelativeTime time_info(TimeEventInfo::Periodic, period);

  sError error;
  error=SetTimeEvent(&time_info,
                     myOID_,
                     Extra_Entry[entrySyncTimer],
                     &dummy,
                     0,
                     &eid);
  if (error != sSUCCESS) {
    cout << "error :" << error << ":" << endl;
  }
}

void CommMgrObject::init() {

  initNetwork();

  loadDictionary();
  
  // Load my MAC address and look up my ID based on it.
  EtherDriverGetMACAddressMsg msg;
  
  ERA201D1_GetMACAddress(&msg);
  
  my_id = 1;
  my_rank = 0;
  for(unsigned int i=0; i < all_peers.size(); i++) {
    bool match = true;
    for(int b=0; b<6; b++) {
      if(msg.address.octet[b] != all_peers[i].mac[b]) {
	match = false;
	break;
      }
    }
    
    if(match) {
      my_rank = i;
      my_id = all_peers[my_rank].id;
    }
  }

  // Create sockets to listen and send to peers
  for (unsigned int i = 0; i < all_peers.size(); i++) {
    createUDPSender(i, true);
  }
}

void CommMgrObject::dumpInfo() {
  pprintf(TextOutputStream, "CommMgrObject: MAC=%X:%X:%X:%X:%X:%X ID=%d\n",
	  all_peers[my_rank].mac[0], all_peers[my_rank].mac[1],
	  all_peers[my_rank].mac[2], all_peers[my_rank].mac[3],
	  all_peers[my_rank].mac[4], all_peers[my_rank].mac[5], my_id);
}

// Whee, do nothing
void CommMgrObject::ReadyGeneric(const OReadyEvent &event) {
}

// Sends the message provided in the event to all peers.
// This sets the timestamp field in the NetMsgHeader.
// The message contains a NetMsgHeader in a buffer that
// contains header.totalLength bytes.
void CommMgrObject::BCastMessage(const ONotifyEvent &event) {

  for (int i = 0; i < event.NumOfData(); i++) {

    NetMsgHeader *header = (NetMsgHeader*)event.Data(i);
    header->timestamp = GetTime();

    for(unsigned int ap_index=0; ap_index<all_peers.size(); ap_index++) {
      if((all_peers[ap_index].status==Peer::GOOD ||
	  all_peers[ap_index].status==Peer::POOR ||
	  all_peers[ap_index].status==Peer::INITIAL) &&
	 all_peers[ap_index].id != my_id)

	send(ap_index, header, header->totalLength);
    }
  }

  observer[obsBCastMessage]->AssertReady();
}

// Sends the message provided in the event to a single peer.
// This sets the timestamp field in the NetMsgHeader.
// The message contains a NetMsgHeader in a buffer that
// contains header.totalLength bytes.
void CommMgrObject::SendMessage(const ONotifyEvent &event) {

  for (int i = 0; i < event.NumOfData(); i++) {
    NetMsgHeader *header = (NetMsgHeader*)event.Data(i);
    header->timestamp = GetTime();
    
    int ap_index = idToRankMap[header->receiverID];

    if(ap_index!=-1 &&
       (all_peers[ap_index].status==Peer::GOOD ||
	all_peers[ap_index].status==Peer::POOR ||
	all_peers[ap_index].status==Peer::INITIAL))
      send(ap_index, header, header->totalLength);
  }

  observer[obsSendMessage]->AssertReady();
}

unsigned long CommMgrObject::convertto(Peer* peer, unsigned long t) {
  return convert(peer, t, true);
}

unsigned long CommMgrObject::convertfrom(Peer* peer, unsigned long t) {
  return convert(peer, t, false);
}

unsigned long CommMgrObject::convert(Peer* peer, unsigned long t, bool addoffset) {
  long offset = peer->offsetEstimate;

  if (!addoffset) {
    offset = -offset;
  }

  return t + offset;
}

void CommMgrObject::loadDictionary() {
 
  Dictionary addr_info;
  char field_name[256];
  const char *field_value = NULL;
  addr_info.read("/MS/config/macaddr.cfg");

  // Set idToRankMap entries to -1. -1 indicates that we don't know
  // anything about a robot with the specified ID.
  for (int i = 0; i <= MAX_ID; i++) {
    idToRankMap[i] = -1;
  }

  // Read all of our ers210 fields. We look through all possible
  // names (i.e. ers210_<num> and see which entries actually
  // exist. We then read the info for the ones that are
  // actually there.
  
  int nEntries = 0;

  for(int i=0; i < MAX_BOTS; i++) {
    sprintf(field_name, "ers210_%02d", i);

    // see if there is an entry for this robot name
    if(addr_info.getValueString(field_name, field_value)) {
      // okay, this robot exists in the file. Make sure we
      // can read all of the info.
      Peer temp = Peer();
      
      int ip[4];
      if(sscanf(field_value, "MAC=%X:%X:%X:%X:%X:%X IP=%d.%d.%d.%d ID=%d",
		&temp.mac[0], &temp.mac[1], &temp.mac[2], 
		&temp.mac[3], &temp.mac[4], &temp.mac[5],
		&ip[0], &ip[1], &ip[2], &ip[3],
		&temp.id)!=11 ||
	 temp.id < 0 ||
	 temp.id > MAX_ID) {
	
        cout << "died reading: " << field_name << endl;
	
      } else {
	// We successfully read all of the info
        uint32 ip32 = ip[0] << 24 | ip[1] << 16 | ip[2] << 8 | ip[3];
        temp.ip = IPAddress(ip32);
        temp.rank = nEntries;
        if (idToRankMap[temp.id] != -1) {
          pprintf(TextOutputStream, 
                  "CommMgrObject warning: multiple robots were given id %d in macaddr.cfg\n",
                  temp.id);
        } else {
          idToRankMap[temp.id] = nEntries;
        }

	all_peers.push_back(temp);
        nEntries++;
      }      
    }
  }
  
  // Read all of our ers7 fields
  for(int i=0; i < MAX_BOTS; i++) {
    sprintf(field_name, "ers7_%02d", i);
    
    // see if there is an entry for this robot name
    if(addr_info.getValueString(field_name, field_value)) {
      // okay, this robot exists in the file. Make sure we
      // can read all of the info.
      Peer temp = Peer();
      
      int ip[4];
      if(sscanf(field_value, "MAC=%X:%X:%X:%X:%X:%X IP=%d.%d.%d.%d ID=%d",
		&temp.mac[0], &temp.mac[1], &temp.mac[2], 
		&temp.mac[3], &temp.mac[4], &temp.mac[5],
		&ip[0], &ip[1], &ip[2], &ip[3],
		&temp.id)!=11) {

	cout << "died reading: " << field_name << endl;
      } else {
	// We successfully read all of the info
        uint32 ip32 = ip[0] << 24 | ip[1] << 16 | ip[2] << 8 | ip[3];
        temp.ip = IPAddress(ip32);
        temp.rank = nEntries;
        if (idToRankMap[temp.id] != -1) {
          pprintf(TextOutputStream, 
                  "CommMgrObject warning: multiple robots were given id %d in macaddr.cfg\n",
                  temp.id);
        }
        else {
          idToRankMap[temp.id] = nEntries;
        }

	all_peers.push_back(temp);
        nEntries++;
      }
    }
  }  
}
 
