package seng302; import javafx.application.Platform; import org.xml.sax.SAXException; import seng302.Mock.*; import seng302.Networking.BinaryMessageDecoder; import seng302.Networking.Exceptions.InvalidMessageException; import seng302.Networking.Messages.*; import javax.xml.parsers.ParserConfigurationException; import java.io.DataInputStream; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import static seng302.Networking.Utils.ByteConverter.bytesToShort; /** * TCP client which receives packets/messages from a race data source (e.g., mock source, official source), and exposes them to any observers. */ public class VisualiserInput implements Runnable { ///Timestamp of the last heartbeat. private long lastHeartbeatTime = -1; ///Sequence number of the last heartbeat. private long lastHeartbeatSequenceNum = -1; ///The socket that we have connected to. private Socket connectionSocket; ///Object to store parsed course data. //TODO comment? private StreamedCourse course; ///The last RaceStatus message received. private RaceStatus raceStatus; ///A map of the last BoatStatus message received, for each boat. private final Map boatStatusMap = new HashMap<>(); ///A map of the last BoatLocation message received, for each boat. private final Map boatLocationMap = new HashMap<>(); ///The last AverageWind message received. private AverageWind averageWind; ///The last CourseWinds message received. private CourseWinds courseWinds; ///A map of the last MarkRounding message received, for each boat. private final Map markRoundingMap = new HashMap<>(); ///InputStream (from the socket). private DataInputStream inStream; /** * Ctor. * @param socket Socket from which we will receive race data. * @param course TODO comment? * @throws IOException If there is something wrong with the socket's input stream. */ public VisualiserInput(Socket socket, StreamedCourse course) throws IOException { this.connectionSocket = socket; //We wrap a DataInputStream around the socket's InputStream because it has the stream.readFully(buffer) function, which is a blocking read until the buffer has been filled. this.inStream = new DataInputStream(connectionSocket.getInputStream()); this.course = course; this.lastHeartbeatTime = System.currentTimeMillis(); } /** * Provides StreamedCourse container for fixed course data. * @return Course for current VisualiserInput instance. * @see seng302.Mock.StreamedCourse */ public StreamedCourse getCourse() { return course; } /** * Returns the last boat location message associated with the given boat source ID. * @param sourceID Unique global identifier for the boat. * @return The most recent location message. */ public BoatLocation getBoatLocationMessage(int sourceID) { return boatLocationMap.get(sourceID); } public BoatStatus getBoatStatusMessage(int sourceID) { return boatStatusMap.get(sourceID); } /** * Calculates the time since last heartbeat, in milliseconds. * @return Time since last heartbeat, in milliseconds.. */ private double timeSinceHeartbeat() { long now = System.currentTimeMillis(); return (now - lastHeartbeatTime); } /** * Returns the boat locations map. Maps from Integer (Boat ID) to BoatLocation. * @return Map of boat locations. */ public Map getBoatLocationMap() { return boatLocationMap; } /** * Gets the status of the race. * @return The status of the race. */ public RaceStatus getRaceStatus() { return raceStatus; } /** * Returns the boat statuses map. Maps from Integer (Boat ID) to BoatStatus. * @return Map of boat statuses. */ public Map getBoatStatusMap() { return boatStatusMap; } /** * Returns the average wind of the race. * @return Average wind in the race. */ public AverageWind getAverageWind() { return averageWind; } /** * Returns winds in the course. * @return Winds that are in the course. */ public CourseWinds getCourseWinds() { return courseWinds; } /** * Returns the mark roundings map. Maps from Integer (Boat ID) to MarkRounding. * @return Map of mark roundings. */ public Map getMarkRoundingMap() { return markRoundingMap; } /** * Sets the wind direction for the current course. * @param direction The new wind direction for the course. */ private void setCourseWindDirection(double direction) { this.course.setWindDirection(direction); } /** * Reads and returns the next message as an array of bytes from the socket. Use getNextMessage() to get the actual message object instead. * @return Encoded binary message bytes. * @throws IOException Thrown when an error occurs while reading from the socket. */ private byte[] getNextMessageBytes() throws IOException { inStream.mark(0); short CRCLength = 4; short headerLength = 15; //Read the header of the next message. byte[] headerBytes = new byte[headerLength]; inStream.readFully(headerBytes); //Read the message body length. byte[] messageBodyLengthBytes = Arrays.copyOfRange(headerBytes, headerLength - 2, headerLength); short messageBodyLength = bytesToShort(messageBodyLengthBytes); //Read the message body. byte[] messageBodyBytes = new byte[messageBodyLength]; inStream.readFully(messageBodyBytes); //Read the message CRC. byte[] messageCRCBytes = new byte[CRCLength]; inStream.readFully(messageCRCBytes); //Put the head + body + crc into one large array. ByteBuffer messageBytes = ByteBuffer.allocate(headerBytes.length + messageBodyBytes.length + messageCRCBytes.length); messageBytes.put(headerBytes); messageBytes.put(messageBodyBytes); messageBytes.put(messageCRCBytes); return messageBytes.array(); } /** * Reads and returns the next message object from the socket. * @return The message object. Use instanceof for concrete type. * @throws IOException Thrown when an error occurs while reading from the socket. * @throws InvalidMessageException Thrown when the message is invalid in some way. */ private AC35Data getNextMessage() throws IOException, InvalidMessageException { //Get the next message from the socket as a block of bytes. byte[] messageBytes = this.getNextMessageBytes(); //Decode the binary message into an appropriate message object. BinaryMessageDecoder decoder = new BinaryMessageDecoder(messageBytes); return decoder.decode(); } /** * Main loop which reads messages from the socket, and exposes them. */ public void run(){ boolean receiverLoop = true; //receiver loop that gets the input while (receiverLoop) { //If no heartbeat has been received in more the heartbeat period //then the connection will need to be restarted. //System.out.println("time since last heartbeat: " + timeSinceHeartbeat());//TEMP REMOVE long heartBeatPeriod = 10 * 1000; if (timeSinceHeartbeat() > heartBeatPeriod) { System.out.println("Connection has stopped, trying to reconnect."); //Attempt to reconnect the socket. try {//This attempt doesn't really work. Under what circumstances would this.connectionSocket = new Socket(this.connectionSocket.getInetAddress(), this.connectionSocket.getPort()); //this.connectionSocket.connect(this.connectionSocket.getRemoteSocketAddress()); //Reset the heartbeat timer. this.lastHeartbeatTime = System.currentTimeMillis(); } catch (IOException e) { System.err.println("Unable to reconnect."); //Wait 500ms. Ugly hack, should refactor. long waitPeriod = 500; long waitTimeStart = System.currentTimeMillis() + waitPeriod; while (System.currentTimeMillis() < waitTimeStart){ //Nothing. Busyloop. } //Swallow the exception. continue; } } //Reads the next message. AC35Data message; try { message = this.getNextMessage(); } catch (InvalidMessageException | IOException e) { //Prints exception to stderr, and iterate loop (that is, read the next message). System.err.println("Unable to read message: " + e.getMessage()); try { inStream.reset(); } catch (IOException e1) { e1.printStackTrace(); } //Continue to the next loop iteration/message. continue; }/* //Add it to message queue. this.messagesReceivedQueue.add(message);*/ //Checks which message is being received and does what is needed for that message. //Heartbeat. if (message instanceof Heartbeat) { Heartbeat heartbeat = (Heartbeat) message; //Check that the heartbeat number is greater than the previous value, and then set the last heartbeat time. if (heartbeat.getSequenceNumber() > this.lastHeartbeatSequenceNum) { lastHeartbeatTime = System.currentTimeMillis(); lastHeartbeatSequenceNum = heartbeat.getSequenceNumber(); //System.out.println("HeartBeat Message! " + lastHeartbeatSequenceNum); } } //RaceStatus. else if (message instanceof RaceStatus) { RaceStatus raceStatus = (RaceStatus) message; //System.out.println("Race Status Message"); this.raceStatus = raceStatus; for (BoatStatus boatStatus: this.raceStatus.getBoatStatuses()) { this.boatStatusMap.put(boatStatus.getSourceID(), boatStatus); } setCourseWindDirection(raceStatus.getScaledWindDirection()); } //DisplayTextMessage. /*else if (message instanceof DisplayTextMessage) { //System.out.println("Display Text Message"); //No decoder for this. }*/ //XMLMessage. else if (message instanceof XMLMessage) { XMLMessage xmlMessage = (XMLMessage) message; //System.out.println("XML Message!"); Platform.runLater(()-> { if (xmlMessage.getXmlMsgSubType() == XMLMessage.XMLTypeRegatta) { //System.out.println("Setting Regatta"); try { course.setRegattaXMLReader(new RegattaXMLReader(xmlMessage.getXmlMessage())); } //TODO REFACTOR should put all of these exceptions behind a RegattaXMLReaderException. catch (IOException | SAXException | ParserConfigurationException e) { System.err.println("Error creating RegattaXMLReader: " + e.getMessage()); //Continue to the next loop iteration/message. } } else if (xmlMessage.getXmlMsgSubType() == XMLMessage.XMLTypeRace) { //System.out.println("Setting Course"); try { course.setStreamedCourseXMLReader(new StreamedCourseXMLReader(xmlMessage.getXmlMessage())); } //TODO REFACTOR should put all of these exceptions behind a StreamedCourseXMLReaderException. catch (IOException | SAXException | ParserConfigurationException | StreamedCourseXMLException e) { System.err.println("Error creating StreamedCourseXMLReader: " + e.getMessage()); //Continue to the next loop iteration/message. } } else if (xmlMessage.getXmlMsgSubType() == XMLMessage.XMLTypeBoat) { //System.out.println("Setting Boats"); try { course.setBoatXMLReader(new BoatXMLReader(xmlMessage.getXmlMessage())); } //TODO REFACTOR should put all of these exceptions behind a BoatXMLReaderException. catch (IOException | SAXException | ParserConfigurationException e) { System.err.println("Error creating BoatXMLReader: " + e.getMessage()); //Continue to the next loop iteration/message. } } }); } //RaceStartStatus. else if (message instanceof RaceStartStatus) { //System.out.println("Race Start Status Message"); } //YachtEventCode. /*else if (message instanceof YachtEventCode) { YachtEventCode yachtEventCode = (YachtEventCode) message; //System.out.println("Yacht Event Code!"); //No decoder for this. }*/ //YachtActionCode. /*else if (message instanceof YachtActionCode) { YachtActionCode yachtActionCode = (YachtActionCode) message; //System.out.println("Yacht Action Code!"); //No decoder for this. }*/ //ChatterText. /*else if (message instanceof ChatterText) { ChatterText chatterText = (ChatterText) message; //System.out.println("Chatter Text Message!"); //No decoder for this. }*/ //BoatLocation. else if (message instanceof BoatLocation) { BoatLocation boatLocation = (BoatLocation) message; //System.out.println("Boat Location!"); if (this.boatLocationMap.containsKey(boatLocation.getSourceID())) { //If our boatlocation map already contains a boat location message for this boat, check that the new message is actually for a later timestamp (i.e., newer). if (boatLocation.getTime() > this.boatLocationMap.get(boatLocation.getSourceID()).getTime()){ //If it is, replace the old message. this.boatLocationMap.put(boatLocation.getSourceID(), boatLocation); } }else{ //If the map _doesn't_ already contain a message for this boat, insert the message. this.boatLocationMap.put(boatLocation.getSourceID(), boatLocation); } } //MarkRounding. else if (message instanceof MarkRounding) { MarkRounding markRounding = (MarkRounding) message; //System.out.println("Mark Rounding Message!"); if (this.markRoundingMap.containsKey(markRounding.getSourceID())) { //If our markRoundingMap already contains a mark rounding message for this boat, check that the new message is actually for a later timestamp (i.e., newer). if (markRounding.getTime() > this.markRoundingMap.get(markRounding.getSourceID()).getTime()){ //If it is, replace the old message. this.markRoundingMap.put(markRounding.getSourceID(), markRounding); } }else{ //If the map _doesn't_ already contain a message for this boat, insert the message. this.markRoundingMap.put(markRounding.getSourceID(), markRounding); } } //CourseWinds. else if (message instanceof CourseWinds) { //System.out.println("Course Wind Message!"); this.courseWinds = (CourseWinds) message; } //AverageWind. else if (message instanceof AverageWind) { //System.out.println("Average Wind Message!"); this.averageWind = (AverageWind) message; } //Unrecognised message. else { System.out.println("Broken Message!"); } } } }