You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

442 lines
17 KiB

package seng302;
import org.xml.sax.SAXException;
import seng302.Mock.*;
import seng302.Networking.BinaryMessageDecoder;
import seng302.Networking.Exceptions.InvalidMessageException;
import seng302.Networking.Messages.*;
import javax.xml.parsers.ParserConfigurationException;
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import static seng302.Networking.Utils.ByteConverter.bytesToInt;
import static seng302.Networking.Utils.ByteConverter.bytesToShort;
/**
* TCP client which receives packets/messages from a race data source (e.g., mock source, official source), and exposes them to any observers.
*/
public class VisualiserInput implements Runnable {
///A queue that contains messages that have been received, and need to be handled.
private ArrayBlockingQueue<AC35Data> messagesReceivedQueue = new ArrayBlockingQueue<>(1000000);//We have room for 1,000,000. Is this much capacity actually needed?
///Timestamp of the last heartbeat.
private long lastHeartbeatTime = -1;
///Sequence number of the last heartbeat.
private long lastHeartbeatSequenceNum = -1;
///How long we should wait for a heartbeat before assuming the connection is dead. Measured in milliseconds.
private long heartBeatPeriod = 10 * 1000;
///The socket that we have connected to.
private Socket connectionSocket;
///Object to store parsed course data. //TODO comment?
private StreamedCourse course;
///The last RaceStatus message received.
private RaceStatus raceStatus;
///A map of the last BoatStatus message received, for each boat.
private Map<Integer, BoatStatus> boatStatusMap = new HashMap<>();;
///A map of the last BoatLocation message received, for each boat.
private Map<Integer, BoatLocation> boatLocationMap = new HashMap<>();
///The last AverageWind message received.
private AverageWind averageWind;
///The last CourseWinds message received.
private CourseWinds courseWinds;
///A map of the last MarkRounding message received, for each boat.
private Map<Integer, MarkRounding> markRoundingMap = new HashMap<>();
///The last RaceStartStatus message received.
private RaceStartStatus raceStartStatus;
///InputStream (from the socket).
private DataInputStream inStream;
///TODO comment?
private boolean receiverLoop = true;
/**
* Ctor.
* @param socket Socket from which we will receive race data.
* @param course TODO comment?
* @throws IOException If there is something wrong with the socket's input stream.
*/
public VisualiserInput(Socket socket, StreamedCourse course) throws IOException {
this.connectionSocket = socket;
//We wrap a DataInputStream around the socket's InputStream because it has the stream.readFully(buffer) function, which is a blocking read until the buffer has been filled.
this.inStream = new DataInputStream(connectionSocket.getInputStream());
this.course = course;
this.lastHeartbeatTime = System.currentTimeMillis();
}
/**
* Provides StreamedCourse container for fixed course data.
* @return Course for current VisualiserInput instance.
* @see seng302.Mock.StreamedCourse
*/
public StreamedCourse getCourse() {
return course;
}
/**
* Returns the last boat location message associated with the given boat source ID.
* @param sourceID Unique global identifier for the boat.
* @return The most recent location message.
*/
public BoatLocation getBoatLocationMessage(int sourceID) {
return boatLocationMap.get(sourceID);
}
/**
* Calculates the time since last heartbeat, in milliseconds.
* @return Time since last heartbeat, in milliseconds..
*/
private double timeSinceHeartbeat() {
long now = System.currentTimeMillis();
return (now - lastHeartbeatTime);
}
/**
* Returns the boat locations map. Maps from Integer (Boat ID) to BoatLocation.
* @return Map of boat locations.
*/
public Map<Integer, BoatLocation> getBoatLocationMap() {
return boatLocationMap;
}
/**
* Gets the status of the race.
* @return The status of the race.
*/
public RaceStatus getRaceStatus() {
return raceStatus;
}
/**
* Returns the boat statuses map. Maps from Integer (Boat ID) to BoatStatus.
* @return Map of boat statuses.
*/
public Map<Integer, BoatStatus> getBoatStatusMap() {
return boatStatusMap;
}
/**
* Returns the average wind of the race.
* @return Average wind in the race.
*/
public AverageWind getAverageWind() {
return averageWind;
}
/**
* Returns winds in the course.
* @return Winds that are in the course.
*/
public CourseWinds getCourseWinds() {
return courseWinds;
}
/**
* Returns the mark roundings map. Maps from Integer (Boat ID) to MarkRounding.
* @return Map of mark roundings.
*/
public Map<Integer, MarkRounding> getMarkRoundingMap() {
return markRoundingMap;
}
/**
* Sets the wind direction for the current course.
*/
private void setCourseWindDirection(double direction) {
this.course.setWindDirection(direction);
}
/**
* Reads and returns the next message as an array of bytes from the socket. Use getNextMessage() to get the actual message object instead.
* @return Encoded binary message bytes.
* @throws IOException Thrown when an error occurs while reading from the socket.
*/
private byte[] getNextMessageBytes() throws IOException {
short CRCLength = 4;
short headerLength = 15;
//Read the header of the next message.
byte[] headerBytes = new byte[headerLength];
inStream.readFully(headerBytes);
//Read the message body length.
byte[] messageBodyLengthBytes = Arrays.copyOfRange(headerBytes, headerLength - 2, headerLength);
short messageBodyLength = bytesToShort(messageBodyLengthBytes);
//Read the message body.
byte[] messageBodyBytes = new byte[messageBodyLength];
inStream.readFully(messageBodyBytes);
//Read the message CRC.
byte[] messageCRCBytes = new byte[CRCLength];
inStream.readFully(messageCRCBytes);
//Put the head + body + crc into one large array.
ByteBuffer messageBytes = ByteBuffer.allocate(headerBytes.length + messageBodyBytes.length + messageCRCBytes.length);
messageBytes.put(headerBytes);
messageBytes.put(messageBodyBytes);
messageBytes.put(messageCRCBytes);
return messageBytes.array();
}
/**
* Reads and returns the next message object from the socket.
* @return The message object. Use instanceof for concrete type.
* @throws IOException Thrown when an error occurs while reading from the socket.
* @throws InvalidMessageException Thrown when the message is invalid in some way.
*/
private AC35Data getNextMessage() throws IOException, InvalidMessageException
{
//Get the next message from the socket as a block of bytes.
byte[] messageBytes = this.getNextMessageBytes();
//Decode the binary message into an appropriate message object.
BinaryMessageDecoder decoder = new BinaryMessageDecoder(messageBytes);
AC35Data message = decoder.decode();
return message;
}
/**
* Main loop which reads messages from the socket, and exposes them.
*/
public void run(){
this.receiverLoop = true;
//receiver loop that gets the input
while (receiverLoop) {
//If no heartbeat has been received in more the heartbeat period
//then the connection will need to be restarted.
System.out.println("time since last heartbeat: " + timeSinceHeartbeat());//TEMP REMOVE
if (timeSinceHeartbeat() > this.heartBeatPeriod) {
System.out.println("Connection has stopped, trying to reconnect.");
//Attempt to reconnect the socket.
try {//This attempt doesn't really work. Under what circumstances would
this.connectionSocket = new Socket(this.connectionSocket.getInetAddress(), this.connectionSocket.getPort());
//this.connectionSocket.connect(this.connectionSocket.getRemoteSocketAddress());
//Reset the heartbeat timer.
this.lastHeartbeatTime = System.currentTimeMillis();
}
catch (IOException e) {
System.err.println("Unable to reconnect.");
//Wait 500ms. Ugly hack, should refactor.
long waitPeriod = 500;
long waitTimeStart = System.currentTimeMillis() + waitPeriod;
while (System.currentTimeMillis() < waitTimeStart){
//Nothing. Busyloop.
}
//Swallow the exception.
continue;
}
}
//Reads the next message.
AC35Data message = null;
try {
message = this.getNextMessage();
}
catch (InvalidMessageException | IOException e) {
//Prints exception to stderr, and iterate loop (that is, read the next message).
System.err.println("Unable to read message: " + e.getMessage());
//Continue to the next loop iteration/message.
continue;
}
//Add it to message queue.
this.messagesReceivedQueue.add(message);
//Checks which message is being received and does what is needed for that message.
//Heartbeat.
if (message instanceof Heartbeat) {
Heartbeat heartbeat = (Heartbeat) message;
//Check that the heartbeat number is greater than the previous value, and then set the last heartbeat time.
if (heartbeat.getSequenceNumber() > this.lastHeartbeatSequenceNum) {
lastHeartbeatTime = System.currentTimeMillis();
lastHeartbeatSequenceNum = heartbeat.getSequenceNumber();
//System.out.println("HeartBeat Message! " + lastHeartbeatSequenceNum);
}
}
//RaceStatus.
else if (message instanceof RaceStatus) {
RaceStatus raceStatus = (RaceStatus) message;
//System.out.println("Race Status Message");
this.raceStatus = raceStatus;
for (BoatStatus boatStatus: this.raceStatus.getBoatStatuses()) {
this.boatStatusMap.put(boatStatus.getSourceID(), boatStatus);
}
setCourseWindDirection(raceStatus.getScaledWindDirection() + 180);
}
//DisplayTextMessage.
/*else if (message instanceof DisplayTextMessage) {
//System.out.println("Display Text Message");
//No decoder for this.
}*/
//XMLMessage.
else if (message instanceof XMLMessage) {
XMLMessage xmlMessage = (XMLMessage) message;
//System.out.println("XML Message!");
if (xmlMessage.getXmlMsgSubType() == xmlMessage.XMLTypeRegatta){
//System.out.println("Setting Regatta");
try {
course.setRegattaXMLReader(new RegattaXMLReader(xmlMessage.getXmlMessage()));
}
//TODO REFACTOR should put all of these exceptions behind a RegattaXMLReaderException.
catch (IOException | SAXException | ParserConfigurationException e) {
System.err.println("Error creating RegattaXMLReader: " + e.getMessage());
//Continue to the next loop iteration/message.
continue;
}
} else if (xmlMessage.getXmlMsgSubType() == xmlMessage.XMLTypeRace){
//System.out.println("Setting Course");
try {
course.setStreamedCourseXMLReader(new StreamedCourseXMLReader(xmlMessage.getXmlMessage()));
}
//TODO REFACTOR should put all of these exceptions behind a StreamedCourseXMLReaderException.
catch (IOException | SAXException | ParserConfigurationException | ParseException | StreamedCourseXMLException e) {
System.err.println("Error creating StreamedCourseXMLReader: " + e.getMessage());
//Continue to the next loop iteration/message.
continue;
}
} else if (xmlMessage.getXmlMsgSubType() == xmlMessage.XMLTypeBoat){
//System.out.println("Setting Boats");
try
{
course.setBoatXMLReader(new BoatXMLReader(xmlMessage.getXmlMessage()));
}
//TODO REFACTOR should put all of these exceptions behind a BoatXMLReaderException.
catch (IOException | SAXException | ParserConfigurationException e) {
System.err.println("Error creating BoatXMLReader: " + e.getMessage());
//Continue to the next loop iteration/message.
continue;
}
}
}
//RaceStartStatus.
else if (message instanceof RaceStartStatus) {
RaceStartStatus raceStartStatus = (RaceStartStatus) message;
//System.out.println("Race Start Status Message");
this.raceStartStatus = raceStartStatus;
}
//YachtEventCode.
/*else if (message instanceof YachtEventCode) {
YachtEventCode yachtEventCode = (YachtEventCode) message;
//System.out.println("Yacht Event Code!");
//No decoder for this.
}*/
//YachtActionCode.
/*else if (message instanceof YachtActionCode) {
YachtActionCode yachtActionCode = (YachtActionCode) message;
//System.out.println("Yacht Action Code!");
//No decoder for this.
}*/
//ChatterText.
/*else if (message instanceof ChatterText) {
ChatterText chatterText = (ChatterText) message;
//System.out.println("Chatter Text Message!");
//No decoder for this.
}*/
//BoatLocation.
else if (message instanceof BoatLocation) {
BoatLocation boatLocation = (BoatLocation) message;
//System.out.println("Boat Location!");
if (this.boatLocationMap.containsKey(boatLocation.getSourceID())) {
//If our boatlocation map already contains a boat location message for this boat, check that the new message is actually for a later timestamp (i.e., newer).
if (boatLocation.getTime() > this.boatLocationMap.get(boatLocation.getSourceID()).getTime()){
//If it is, replace the old message.
this.boatLocationMap.put(boatLocation.getSourceID(), boatLocation);
}
}else{
//If the map _doesn't_ already contain a message for this boat, insert the message.
this.boatLocationMap.put(boatLocation.getSourceID(), boatLocation);
}
}
//MarkRounding.
else if (message instanceof MarkRounding) {
MarkRounding markRounding = (MarkRounding) message;
//System.out.println("Mark Rounding Message!");
if (this.markRoundingMap.containsKey(markRounding.getSourceID())) {
//If our markRoundingMap already contains a mark rounding message for this boat, check that the new message is actually for a later timestamp (i.e., newer).
if (markRounding.getTime() > this.markRoundingMap.get(markRounding.getSourceID()).getTime()){
//If it is, replace the old message.
this.markRoundingMap.put(markRounding.getSourceID(), markRounding);
}
}else{
//If the map _doesn't_ already contain a message for this boat, insert the message.
this.markRoundingMap.put(markRounding.getSourceID(), markRounding);
}
}
//CourseWinds.
else if (message instanceof CourseWinds) {
CourseWinds courseWinds = (CourseWinds) message;
//System.out.println("Course Wind Message!");
this.courseWinds = courseWinds;
}
//AverageWind.
else if (message instanceof AverageWind) {
AverageWind averageWind = (AverageWind) message;
//System.out.println("Average Wind Message!");
this.averageWind = averageWind;
}
//Unrecognised message.
else {
System.out.println("Broken Message!");
}
}
}
}