You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

325 lines
10 KiB

package mock.app;
import network.BinaryMessageEncoder;
import network.MessageEncoders.RaceVisionByteEncoder;
import network.Messages.*;
import network.Messages.Enums.MessageType;
import network.Messages.Enums.XMLMessageType;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
/**
* TCP server to send race information to connected clients.
*/
public class MockOutput implements Runnable
{
/**
* Timestamp of the last sent heartbeat message.
*/
private long lastHeartbeatTime;
/**
* Period for the heartbeat - that is, how often we send it.
*/
private double heartbeatPeriod = 5.0;
/**
* Output stream which wraps around mockSocket outstream.
*/
private DataOutputStream outToVisualiser;
/**
* An object containing the set of latest messages to send.
* Every server frame, MockOutput reads messages from this, and send them.
*/
private LatestMessages latestMessages;
/**
* Ack numbers used in messages.
*/
private int ackNumber = 1;
/**
* Sequence number for heartbeat messages.
*/
private int heartbeatSequenceNum = 1;
private boolean stop = false; //whether or not hte thread keeps running
/**
* Ctor.
* @param latestMessages Latests Messages that the Mock is to send out
* @param outToVisualiser DataStream to output to Visualisers
* @throws IOException if server socket cannot be opened.
*/
public MockOutput(LatestMessages latestMessages, DataOutputStream outToVisualiser) throws IOException {
this.outToVisualiser = outToVisualiser;
this.lastHeartbeatTime = System.currentTimeMillis();
this.latestMessages = latestMessages;
}
/**
* Increments the ackNumber value, and returns it.
* @return Incremented ackNumber.
*/
private int getNextAckNumber(){
this.ackNumber++;
return this.ackNumber;
}
/**
* Calculates the time since last heartbeat message, in seconds.
* @return Time since last heartbeat message, in seconds.
*/
private double timeSinceHeartbeat() {
long now = System.currentTimeMillis();
return (now - lastHeartbeatTime) / 1000.0;
}
/**
* Generates the next heartbeat message and returns it. Increments the heartbeat sequence number.
* @return The next heartbeat message.
*/
private Heartbeat createHeartbeatMessage() {
//Create the heartbeat message.
Heartbeat heartbeat = new Heartbeat(this.heartbeatSequenceNum);
heartbeatSequenceNum++;
return heartbeat;
}
/**
* Serializes a heartbeat message into a packet to be sent, and returns the byte array.
* @param heartbeat The heartbeat message to serialize.
* @return Byte array containing the next heartbeat message.
*/
private byte[] parseHeartbeat(Heartbeat heartbeat) {
//Serializes the heartbeat message.
byte[] heartbeatMessage = RaceVisionByteEncoder.heartBeat(heartbeat);
//Places the serialized message in a packet.
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(
MessageType.HEARTBEAT,
System.currentTimeMillis(),
getNextAckNumber(),
(short) heartbeatMessage.length,
heartbeatMessage );
return binaryMessageEncoder.getFullMessage();
}
/**
* Encodes/serialises a XMLMessage message, and returns it.
* @param xmlMessage The XMLMessage message to serialise.
* @return The XMLMessage message in a serialised form.
*/
private synchronized byte[] parseXMLMessage(XMLMessage xmlMessage) {
//Serialize the xml message.
byte[] encodedXML = RaceVisionByteEncoder.xmlMessage(xmlMessage);
//Place the message in a packet.
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(
MessageType.XMLMESSAGE,
System.currentTimeMillis(),
xmlMessage.getAckNumber(), //We use the ack number from the xml message.
(short) encodedXML.length,
encodedXML );
return binaryMessageEncoder.getFullMessage();
}
/**
* Encodes/serialises a BoatLocation message, and returns it.
* @param boatLocation The BoatLocation message to serialise.
* @return The BoatLocation message in a serialised form.
*/
private synchronized byte[] parseBoatLocation(BoatLocation boatLocation){
//Encodes the message.
byte[] encodedBoatLoc = RaceVisionByteEncoder.boatLocation(boatLocation);
//Encodes the full message with header.
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(
MessageType.BOATLOCATION,
System.currentTimeMillis(),
getNextAckNumber(),
(short) encodedBoatLoc.length,
encodedBoatLoc );
return binaryMessageEncoder.getFullMessage();
}
/**
* Encodes/serialises a RaceStatus message, and returns it.
* @param raceStatus The RaceStatus message to serialise.
* @return The RaceStatus message in a serialised form.
*/
private synchronized byte[] parseRaceStatus(RaceStatus raceStatus){
//Encodes the messages.
byte[] encodedRaceStatus = RaceVisionByteEncoder.raceStatus(raceStatus);
//Encodes the full message with header.
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(
MessageType.RACESTATUS,
System.currentTimeMillis(),
getNextAckNumber(),
(short) encodedRaceStatus.length,
encodedRaceStatus );
return binaryMessageEncoder.getFullMessage();
}
/**
* Sends a heartbeat
* @throws IOException if the socket is no longer open at both ends the heartbeat returns an error.
*/
public void sendHeartBeat() throws IOException {
//Sends a heartbeat every so often.
if (timeSinceHeartbeat() >= heartbeatPeriod) {
outToVisualiser.write(parseHeartbeat(createHeartbeatMessage()));
lastHeartbeatTime = System.currentTimeMillis();
}
}
/**
* Sending loop of the Server
*/
public void run() {
try {
while (!stop){
//Wait until all of the xml files have been set.
if (!this.latestMessages.hasAllXMLMessages()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
long previousFrameTime = System.currentTimeMillis();
boolean sentXMLs = false;
while(true) {
try {
long currentFrameTime = System.currentTimeMillis();
//This is the time elapsed, in milliseconds, since the last server "frame".
long framePeriod = currentFrameTime - previousFrameTime;
//We only attempt to send packets every X milliseconds.
long minimumFramePeriod = 16;
if (framePeriod >= minimumFramePeriod) {
//Send XML messages.
if (!sentXMLs) {
//Serialise them.
byte[] raceXMLBlob = parseXMLMessage(latestMessages.getRaceXMLMessage());
byte[] regattaXMLBlob = parseXMLMessage(latestMessages.getRegattaXMLMessage());
byte[] boatsXMLBlob = parseXMLMessage(latestMessages.getBoatXMLMessage());
//Send them.
outToVisualiser.write(raceXMLBlob);
outToVisualiser.write(regattaXMLBlob);
outToVisualiser.write(boatsXMLBlob);
sentXMLs = true;
}
//Sends the RaceStatus message.
if (this.latestMessages.getRaceStatus() != null) {
byte[] raceStatusBlob = this.parseRaceStatus(this.latestMessages.getRaceStatus());
this.outToVisualiser.write(raceStatusBlob);
}
//Send all of the BoatLocation messages.
for (int sourceID : this.latestMessages.getBoatLocationMap().keySet()) {
//Get the message.
BoatLocation boatLocation = this.latestMessages.getBoatLocation(sourceID);
if (boatLocation != null) {
//Encode.
byte[] boatLocationBlob = this.parseBoatLocation(boatLocation);
//Write it.
this.outToVisualiser.write(boatLocationBlob);
}
}
previousFrameTime = currentFrameTime;
} else {
//Wait until the frame period will be large enough.
long timeToWait = minimumFramePeriod - framePeriod;
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
//If we get interrupted, exit the function.
e.printStackTrace();
//Re-set the interrupt flag.
Thread.currentThread().interrupt();
return;
}
}
} catch (SocketException e) {
break;
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void stop(){
stop = true;
}
}