package visualiser.app; import javafx.application.Platform; import network.BinaryMessageDecoder; import network.Exceptions.InvalidMessageException; import network.Messages.*; import org.xml.sax.SAXException; import shared.dataInput.BoatXMLReader; import shared.dataInput.RaceXMLReader; import shared.dataInput.RegattaXMLReader; import shared.exceptions.InvalidBoatDataException; import shared.exceptions.InvalidRaceDataException; import shared.exceptions.InvalidRegattaDataException; import shared.exceptions.XMLReaderException; import javax.xml.parsers.ParserConfigurationException; import java.io.DataInputStream; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import static network.Utils.ByteConverter.bytesToShort; /** * TCP client which receives packets/messages from a race data source * (e.g., mock source, official source), and exposes them to any observers. */ public class VisualiserInput implements Runnable { /** * Timestamp of the last heartbeat. */ private long lastHeartbeatTime = -1; /** * Sequence number of the last heartbeat. */ private long lastHeartbeatSequenceNum = -1; /** * The socket that we have connected to. */ private Socket connectionSocket; /** * InputStream (from the socket). */ private DataInputStream inStream; /** * An object containing the set of latest messages to write to. * Every server frame, VisualiserInput reads messages from its inputStream, and write them to this. */ private LatestMessages latestMessages; /** * Ctor. * @param socket Socket from which we will receive race data. * @throws IOException If there is something wrong with the socket's input stream. */ public VisualiserInput(Socket socket) throws IOException { this.connectionSocket = socket; //We wrap a DataInputStream around the socket's InputStream because it has the stream.readFully(buffer) function, which is a blocking read until the buffer has been filled. this.inStream = new DataInputStream(connectionSocket.getInputStream()); this.latestMessages = new LatestMessages(); this.lastHeartbeatTime = System.currentTimeMillis(); } /** * Returns the LatestMessages object, which can be queried for any received race related messages. * @return The LatestMessages object. */ public LatestMessages getLatestMessages() { return latestMessages; } /** * Calculates the time since last heartbeat, in milliseconds. * @return Time since last heartbeat, in milliseconds.. */ private double timeSinceHeartbeat() { long now = System.currentTimeMillis(); return (now - lastHeartbeatTime); } /** * Reads and returns the next message as an array of bytes from the socket. Use getNextMessage() to get the actual message object instead. * @return Encoded binary message bytes. * @throws IOException Thrown when an error occurs while reading from the socket. */ private byte[] getNextMessageBytes() throws IOException { inStream.mark(0); short CRCLength = 4; short headerLength = 15; //Read the header of the next message. byte[] headerBytes = new byte[headerLength]; inStream.readFully(headerBytes); //Read the message body length. byte[] messageBodyLengthBytes = Arrays.copyOfRange(headerBytes, headerLength - 2, headerLength); short messageBodyLength = bytesToShort(messageBodyLengthBytes); //Read the message body. byte[] messageBodyBytes = new byte[messageBodyLength]; inStream.readFully(messageBodyBytes); //Read the message CRC. byte[] messageCRCBytes = new byte[CRCLength]; inStream.readFully(messageCRCBytes); //Put the head + body + crc into one large array. ByteBuffer messageBytes = ByteBuffer.allocate(headerBytes.length + messageBodyBytes.length + messageCRCBytes.length); messageBytes.put(headerBytes); messageBytes.put(messageBodyBytes); messageBytes.put(messageCRCBytes); return messageBytes.array(); } /** * Reads and returns the next message object from the socket. * @return The message object. Use instanceof for concrete type. * @throws IOException Thrown when an error occurs while reading from the socket. * @throws InvalidMessageException Thrown when the message is invalid in some way. */ private AC35Data getNextMessage() throws IOException, InvalidMessageException { //Get the next message from the socket as a block of bytes. byte[] messageBytes = this.getNextMessageBytes(); //Decode the binary message into an appropriate message object. BinaryMessageDecoder decoder = new BinaryMessageDecoder(messageBytes); return decoder.decode(); } /** * Main loop which reads messages from the socket, and exposes them. */ public void run(){ boolean receiverLoop = true; //receiver loop that gets the input while (receiverLoop) { //If no heartbeat has been received in more the heartbeat period //then the connection will need to be restarted. //System.out.println("time since last heartbeat: " + timeSinceHeartbeat());//TEMP REMOVE long heartBeatPeriod = 10 * 1000; if (timeSinceHeartbeat() > heartBeatPeriod) { System.out.println("Connection has stopped, trying to reconnect."); //Attempt to reconnect the socket. try {//This attempt doesn't really work. Under what circumstances would this.connectionSocket = new Socket(this.connectionSocket.getInetAddress(), this.connectionSocket.getPort()); //this.connectionSocket.connect(this.connectionSocket.getRemoteSocketAddress()); //Reset the heartbeat timer. this.lastHeartbeatTime = System.currentTimeMillis(); } catch (IOException e) { System.err.println("Unable to reconnect."); //Wait 500ms. Ugly hack, should refactor. long waitPeriod = 500; long waitTimeStart = System.currentTimeMillis() + waitPeriod; while (System.currentTimeMillis() < waitTimeStart){ //Nothing. Busyloop. } //Swallow the exception. continue; } } //Reads the next message. AC35Data message; try { message = this.getNextMessage(); } catch (InvalidMessageException | IOException e) { //Prints exception to stderr, and iterate loop (that is, read the next message). System.err.println("Unable to read message: " + e.getMessage()); try { inStream.reset(); } catch (IOException e1) { e1.printStackTrace(); } //Continue to the next loop iteration/message. continue; } //Checks which message is being received and does what is needed for that message. switch (message.getType()) { //Heartbeat. case HEARTBEAT: { Heartbeat heartbeat = (Heartbeat) message; //Check that the heartbeat number is greater than the previous value, and then set the last heartbeat time. if (heartbeat.getSequenceNumber() > this.lastHeartbeatSequenceNum) { lastHeartbeatTime = System.currentTimeMillis(); lastHeartbeatSequenceNum = heartbeat.getSequenceNumber(); //System.out.println("HeartBeat Message! " + lastHeartbeatSequenceNum); } break; } //RaceStatus. case RACESTATUS: { RaceStatus raceStatus = (RaceStatus) message; //System.out.println("Race Status Message"); this.latestMessages.setRaceStatus(raceStatus); for (BoatStatus boatStatus : raceStatus.getBoatStatuses()) { this.latestMessages.setBoatStatus(boatStatus); } break; } //DisplayTextMessage. case DISPLAYTEXTMESSAGE: { //System.out.println("Display Text Message"); //No decoder for this. break; } //XMLMessage. case XMLMESSAGE: { XMLMessage xmlMessage = (XMLMessage) message; //System.out.println("XML Message!"); this.latestMessages.setXMLMessage(xmlMessage); break; } //RaceStartStatus. case RACESTARTSTATUS: { //System.out.println("Race Start Status Message"); break; } //YachtEventCode. case YACHTEVENTCODE: { //YachtEventCode yachtEventCode = (YachtEventCode) message; //System.out.println("Yacht Event Code!"); //No decoder for this. break; } //YachtActionCode. case YACHTACTIONCODE: { //YachtActionCode yachtActionCode = (YachtActionCode) message; //System.out.println("Yacht Action Code!"); // No decoder for this. break; } //ChatterText. case CHATTERTEXT: { //ChatterText chatterText = (ChatterText) message; //System.out.println("Chatter Text Message!"); //No decoder for this. break; } //BoatLocation. case BOATLOCATION: { BoatLocation boatLocation = (BoatLocation) message; //System.out.println("Boat Location!"); BoatLocation existingBoatLocation = this.latestMessages.getBoatLocationMap().get(boatLocation.getSourceID()); if (existingBoatLocation != null) { //If our boatlocation map already contains a boat location message for this boat, check that the new message is actually for a later timestamp (i.e., newer). if (boatLocation.getTime() > existingBoatLocation.getTime()) { //If it is, replace the old message. this.latestMessages.setBoatLocation(boatLocation); } } else { //If the map _doesn't_ already contain a message for this boat, insert the message. this.latestMessages.setBoatLocation(boatLocation); } break; } //MarkRounding. case MARKROUNDING: { MarkRounding markRounding = (MarkRounding) message; //System.out.println("Mark Rounding Message!"); MarkRounding existingMarkRounding = this.latestMessages.getMarkRoundingMap().get(markRounding.getSourceID()); if (existingMarkRounding != null) { //If our markRoundingMap already contains a mark rounding message for this boat, check that the new message is actually for a later timestamp (i.e., newer). if (markRounding.getTime() > existingMarkRounding.getTime()) { //If it is, replace the old message. this.latestMessages.setMarkRounding(markRounding); } } else { //If the map _doesn't_ already contain a message for this boat, insert the message. this.latestMessages.setMarkRounding(markRounding); } break; } //CourseWinds. case COURSEWIND: { //System.out.println("Course Wind Message!"); CourseWinds courseWinds = (CourseWinds) message; this.latestMessages.setCourseWinds(courseWinds); break; } //AverageWind. case AVGWIND: { //System.out.println("Average Wind Message!"); AverageWind averageWind = (AverageWind) message; this.latestMessages.setAverageWind(averageWind); break; } //Unrecognised message. default: { System.out.println("Broken Message!"); break; } } } } }