package seng302; import seng302.Mock.*; import seng302.Networking.BinaryMessageDecoder; import seng302.Networking.Exceptions.InvalidMessageException; import seng302.Networking.Messages.*; import java.io.DataInputStream; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; import static seng302.Networking.Utils.ByteConverter.bytesToShort; /** * TCP client which receives packets/messages from a race data source (e.g., mock source, official source), and exposes them to any observers. */ public class VisualiserInput implements Runnable { ///Timestamp of the last heartbeat. private long lastHeartbeatTime = -1; ///Sequence number of the last heartbeat. private long lastHeartbeatSequenceNum = -1; ///The socket that we have connected to. private Socket connectionSocket; ///Object to store parsed course data. //TODO comment? private StreamedCourse course; private final AC35RaceStatus ac35RaceStatus; ///InputStream (from the socket). private DataInputStream inStream; /** * Ctor. * @param socket Socket from which we will receive race data. * @param course TODO comment? * @throws IOException If there is something wrong with the socket's input stream. */ public VisualiserInput(Socket socket, StreamedCourse course) throws IOException { this.connectionSocket = socket; //We wrap a DataInputStream around the socket's InputStream because it has the stream.readFully(buffer) function, which is a blocking read until the buffer has been filled. this.inStream = new DataInputStream(connectionSocket.getInputStream()); this.course = course; this.ac35RaceStatus = new AC35RaceStatus(this); this.lastHeartbeatTime = System.currentTimeMillis(); } /** * Provides StreamedCourse container for fixed course data. * @return Course for current VisualiserInput instance. * @see seng302.Mock.StreamedCourse */ public StreamedCourse getCourse() { return course; } /** * Returns the last boat location message associated with the given boat source ID. * @param sourceID Unique global identifier for the boat. * @return The most recent location message. */ public BoatLocation getBoatLocationMessage(int sourceID) { return ac35RaceStatus.boatLocation.get(sourceID); } public BoatStatus getBoatStatusMessage(int sourceID) { return boatStatusMap.get(sourceID); } /** * Calculates the time since last heartbeat, in milliseconds. * @return Time since last heartbeat, in milliseconds.. */ private double timeSinceHeartbeat() { long now = System.currentTimeMillis(); return (now - lastHeartbeatTime); } /** * Returns the boat locations map. Maps from Integer (Boat ID) to BoatLocation. * @return Map of boat locations. */ public Map getBoatLocationMap() { return ac35RaceStatus.boatLocation; } /** * Gets the status of the race. * @return The status of the race. */ public RaceStatus getRaceStatus() { return ac35RaceStatus.raceStatus; } /** * Returns the boat statuses map. Maps from Integer (Boat ID) to BoatStatus. * @return Map of boat statuses. */ public Map getBoatStatusMap() { return ac35RaceStatus.boatStatusMap; } /** * Returns the average wind of the race. * @return Average wind in the race. */ public AverageWind getAverageWind() { return ac35RaceStatus.averageWind; } /** * Returns winds in the course. * @return Winds that are in the course. */ public CourseWinds getCourseWinds() { return ac35RaceStatus.courseWinds; } /** * Returns the mark roundings map. Maps from Integer (Boat ID) to MarkRounding. * @return Map of mark roundings. */ public Map getMarkRoundingMap() { return ac35RaceStatus.markRoundingMap; } /** * Sets the wind direction for the current course. * @param direction The new wind direction for the course. */ protected void setCourseWindDirection(double direction) { this.course.setWindDirection(direction); } /** * Sets last time the heartbeat was read * @param lastHeartbeatTime time that the heart beat was read (in ms) */ public void setLastHeartbeatTime(long lastHeartbeatTime) { this.lastHeartbeatTime = lastHeartbeatTime; } /** * gets the last sequencenumber of the heartbeat * @return the last heartbeat sequence number */ public long getLastHeartbeatSequenceNum() { return lastHeartbeatSequenceNum; } /** * Sets the last heartbeat squence number * @param lastHeartbeatSequenceNum sequence number of heartbeat */ public void setLastHeartbeatSequenceNum(long lastHeartbeatSequenceNum) { this.lastHeartbeatSequenceNum = lastHeartbeatSequenceNum; } /** * Reads and returns the next message as an array of bytes from the socket. Use getNextMessage() to get the actual message object instead. * @return Encoded binary message bytes. * @throws IOException Thrown when an error occurs while reading from the socket. */ private byte[] getNextMessageBytes() throws IOException { inStream.mark(0); short CRCLength = 4; short headerLength = 15; //Read the header of the next message. byte[] headerBytes = new byte[headerLength]; inStream.readFully(headerBytes); //Read the message body length. byte[] messageBodyLengthBytes = Arrays.copyOfRange(headerBytes, headerLength - 2, headerLength); short messageBodyLength = bytesToShort(messageBodyLengthBytes); //Read the message body. byte[] messageBodyBytes = new byte[messageBodyLength]; inStream.readFully(messageBodyBytes); //Read the message CRC. byte[] messageCRCBytes = new byte[CRCLength]; inStream.readFully(messageCRCBytes); //Put the head + body + crc into one large array. ByteBuffer messageBytes = ByteBuffer.allocate(headerBytes.length + messageBodyBytes.length + messageCRCBytes.length); messageBytes.put(headerBytes); messageBytes.put(messageBodyBytes); messageBytes.put(messageCRCBytes); return messageBytes.array(); } /** * Reads and returns the next message object from the socket. * @return The message object. Use instanceof for concrete type. * @throws IOException Thrown when an error occurs while reading from the socket. * @throws InvalidMessageException Thrown when the message is invalid in some way. */ private AC35Data getNextMessage() throws IOException, InvalidMessageException { //Get the next message from the socket as a block of bytes. byte[] messageBytes = this.getNextMessageBytes(); //Decode the binary message into an appropriate message object. BinaryMessageDecoder decoder = new BinaryMessageDecoder(messageBytes); return decoder.decode(); } /** * Main loop which reads messages from the socket, and exposes them. */ public void run(){ boolean receiverLoop = true; //receiver loop that gets the input while (receiverLoop) { //If no heartbeat has been received in more the heartbeat period //then the connection will need to be restarted. //System.out.println("time since last heartbeat: " + timeSinceHeartbeat());//TEMP REMOVE long heartBeatPeriod = 10 * 1000; if (timeSinceHeartbeat() > heartBeatPeriod) { System.out.println("Connection has stopped, trying to reconnect."); //Attempt to reconnect the socket. try {//This attempt doesn't really work. Under what circumstances would this.connectionSocket = new Socket(this.connectionSocket.getInetAddress(), this.connectionSocket.getPort()); //this.connectionSocket.connect(this.connectionSocket.getRemoteSocketAddress()); //Reset the heartbeat timer. this.lastHeartbeatTime = System.currentTimeMillis(); } catch (IOException e) { System.err.println("Unable to reconnect."); try { Thread.sleep(500); } catch (InterruptedException e1) { e1.printStackTrace(); } //Swallow the exception. continue; } } //Reads the next message. AC35Data message; try { message = this.getNextMessage(); } catch (InvalidMessageException | IOException e) { //Prints exception to stderr, and iterate loop (that is, read the next message). System.err.println("Unable to read message: " + e.getMessage()); try { inStream.reset(); } catch (IOException e1) { //swallow the exception } //Continue to the next loop iteration/message. continue; } ac35RaceStatus.update(message); } } }