You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
9.3 KiB

package seng302;
import seng302.Mock.*;
import seng302.Networking.BinaryMessageDecoder;
import seng302.Networking.Exceptions.InvalidMessageException;
import seng302.Networking.Messages.*;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import static seng302.Networking.Utils.ByteConverter.bytesToShort;
/**
* TCP client which receives packets/messages from a race data source (e.g., mock source, official source), and exposes them to any observers.
*/
public class VisualiserInput implements Runnable {
///Timestamp of the last heartbeat.
private long lastHeartbeatTime = -1;
///Sequence number of the last heartbeat.
private long lastHeartbeatSequenceNum = -1;
///The socket that we have connected to.
private Socket connectionSocket;
///Object to store parsed course data. //TODO comment?
private StreamedCourse course;
private final AC35RaceStatus ac35RaceStatus;
///InputStream (from the socket).
private DataInputStream inStream;
/**
* Ctor.
* @param socket Socket from which we will receive race data.
* @param course TODO comment?
* @throws IOException If there is something wrong with the socket's input stream.
*/
public VisualiserInput(Socket socket, StreamedCourse course) throws IOException {
this.connectionSocket = socket;
//We wrap a DataInputStream around the socket's InputStream because it has the stream.readFully(buffer) function, which is a blocking read until the buffer has been filled.
this.inStream = new DataInputStream(connectionSocket.getInputStream());
this.course = course;
this.ac35RaceStatus = new AC35RaceStatus(this);
this.lastHeartbeatTime = System.currentTimeMillis();
}
/**
* Provides StreamedCourse container for fixed course data.
* @return Course for current VisualiserInput instance.
* @see seng302.Mock.StreamedCourse
*/
public StreamedCourse getCourse() {
return course;
}
/**
* Returns the last boat location message associated with the given boat source ID.
* @param sourceID Unique global identifier for the boat.
* @return The most recent location message.
*/
public BoatLocation getBoatLocationMessage(int sourceID) {
return ac35RaceStatus.boatLocation.get(sourceID);
}
/**
* Calculates the time since last heartbeat, in milliseconds.
* @return Time since last heartbeat, in milliseconds..
*/
private double timeSinceHeartbeat() {
long now = System.currentTimeMillis();
return (now - lastHeartbeatTime);
}
/**
* Returns the boat locations map. Maps from Integer (Boat ID) to BoatLocation.
* @return Map of boat locations.
*/
public Map<Integer, BoatLocation> getBoatLocationMap() {
return ac35RaceStatus.boatLocation;
}
/**
* Gets the status of the race.
* @return The status of the race.
*/
public RaceStatus getRaceStatus() {
return ac35RaceStatus.raceStatus;
}
/**
* Returns the boat statuses map. Maps from Integer (Boat ID) to BoatStatus.
* @return Map of boat statuses.
*/
public Map<Integer, BoatStatus> getBoatStatusMap() {
return ac35RaceStatus.boatStatusMap;
}
/**
* Returns the average wind of the race.
* @return Average wind in the race.
*/
public AverageWind getAverageWind() {
return ac35RaceStatus.averageWind;
}
/**
* Returns winds in the course.
* @return Winds that are in the course.
*/
public CourseWinds getCourseWinds() {
return ac35RaceStatus.courseWinds;
}
/**
* Returns the mark roundings map. Maps from Integer (Boat ID) to MarkRounding.
* @return Map of mark roundings.
*/
public Map<Integer, MarkRounding> getMarkRoundingMap() {
return ac35RaceStatus.markRoundingMap;
}
/**
* Sets the wind direction for the current course.
* @param direction The new wind direction for the course.
*/
protected void setCourseWindDirection(double direction) {
this.course.setWindDirection(direction);
}
/**
* Sets last time the heartbeat was read
* @param lastHeartbeatTime time that the heart beat was read (in ms)
*/
public void setLastHeartbeatTime(long lastHeartbeatTime) {
this.lastHeartbeatTime = lastHeartbeatTime;
}
/**
* gets the last sequencenumber of the heartbeat
* @return the last heartbeat sequence number
*/
public long getLastHeartbeatSequenceNum() {
return lastHeartbeatSequenceNum;
}
/**
* Sets the last heartbeat squence number
* @param lastHeartbeatSequenceNum sequence number of heartbeat
*/
public void setLastHeartbeatSequenceNum(long lastHeartbeatSequenceNum) {
this.lastHeartbeatSequenceNum = lastHeartbeatSequenceNum;
}
/**
* Reads and returns the next message as an array of bytes from the socket. Use getNextMessage() to get the actual message object instead.
* @return Encoded binary message bytes.
* @throws IOException Thrown when an error occurs while reading from the socket.
*/
private byte[] getNextMessageBytes() throws IOException {
inStream.mark(0);
short CRCLength = 4;
short headerLength = 15;
//Read the header of the next message.
byte[] headerBytes = new byte[headerLength];
inStream.readFully(headerBytes);
//Read the message body length.
byte[] messageBodyLengthBytes = Arrays.copyOfRange(headerBytes, headerLength - 2, headerLength);
short messageBodyLength = bytesToShort(messageBodyLengthBytes);
//Read the message body.
byte[] messageBodyBytes = new byte[messageBodyLength];
inStream.readFully(messageBodyBytes);
//Read the message CRC.
byte[] messageCRCBytes = new byte[CRCLength];
inStream.readFully(messageCRCBytes);
//Put the head + body + crc into one large array.
ByteBuffer messageBytes = ByteBuffer.allocate(headerBytes.length + messageBodyBytes.length + messageCRCBytes.length);
messageBytes.put(headerBytes);
messageBytes.put(messageBodyBytes);
messageBytes.put(messageCRCBytes);
return messageBytes.array();
}
/**
* Reads and returns the next message object from the socket.
* @return The message object. Use instanceof for concrete type.
* @throws IOException Thrown when an error occurs while reading from the socket.
* @throws InvalidMessageException Thrown when the message is invalid in some way.
*/
private AC35Data getNextMessage() throws IOException, InvalidMessageException
{
//Get the next message from the socket as a block of bytes.
byte[] messageBytes = this.getNextMessageBytes();
//Decode the binary message into an appropriate message object.
BinaryMessageDecoder decoder = new BinaryMessageDecoder(messageBytes);
return decoder.decode();
}
/**
* Main loop which reads messages from the socket, and exposes them.
*/
public void run(){
boolean receiverLoop = true;
//receiver loop that gets the input
while (receiverLoop) {
//If no heartbeat has been received in more the heartbeat period
//then the connection will need to be restarted.
//System.out.println("time since last heartbeat: " + timeSinceHeartbeat());//TEMP REMOVE
long heartBeatPeriod = 10 * 1000;
if (timeSinceHeartbeat() > heartBeatPeriod) {
System.out.println("Connection has stopped, trying to reconnect.");
//Attempt to reconnect the socket.
try {//This attempt doesn't really work. Under what circumstances would
this.connectionSocket = new Socket(this.connectionSocket.getInetAddress(), this.connectionSocket.getPort());
//this.connectionSocket.connect(this.connectionSocket.getRemoteSocketAddress());
//Reset the heartbeat timer.
this.lastHeartbeatTime = System.currentTimeMillis();
}
catch (IOException e) {
System.err.println("Unable to reconnect.");
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
//Swallow the exception.
continue;
}
}
//Reads the next message.
AC35Data message;
try {
message = this.getNextMessage();
}
catch (InvalidMessageException | IOException e) {
//Prints exception to stderr, and iterate loop (that is, read the next message).
System.err.println("Unable to read message: " + e.getMessage());
try {
inStream.reset();
} catch (IOException e1) {
//swallow the exception
}
//Continue to the next loop iteration/message.
continue;
}
ac35RaceStatus.update(message);
}
}
}