WIP. Probably need to cherry pick stuff out of here.

Added ClientConnection and server-side handshake.
Added MessageSerialiser and Deserialiser.
#story[1095]
main
fjc40 8 years ago
parent 3ec87582d3
commit 7cc39abe57

@ -1,21 +1,22 @@
package mock.app; package mock.app;
import mock.enums.ConnectionStateEnum;
import mock.model.ClientConnection;
import mock.model.SourceIdAllocator;
import mock.model.commandFactory.CompositeCommand;
import network.Messages.Enums.XMLMessageType; import network.Messages.Enums.XMLMessageType;
import network.Messages.LatestMessages; import network.Messages.LatestMessages;
import network.Messages.RaceSnapshot;
import network.Messages.XMLMessage; import network.Messages.XMLMessage;
import org.mockito.Mock;
import visualiser.gameController.ControllerServer;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Array;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/** /**
* Connection acceptor for multiple clients * Connection acceptor for multiple clients
@ -31,10 +32,31 @@ public class ConnectionAcceptor implements Runnable {
* Socket used to listen for clients on. * Socket used to listen for clients on.
*/ */
private ServerSocket serverSocket; private ServerSocket serverSocket;
//mock outputs
private ArrayBlockingQueue<MockOutput> mockOutputList = new ArrayBlockingQueue<>(16, true);
//latest messages /**
* List of client connections.
*/
private ArrayBlockingQueue<ClientConnection> clientConnections = new ArrayBlockingQueue<>(16, true);
/**
* Snapshot of the race.
*/
private LatestMessages latestMessages; private LatestMessages latestMessages;
/**
* Collection of commands from clients for race to execute.
*/
private CompositeCommand compositeCommand;
/**
* Used to allocate source IDs to clients.
*/
private SourceIdAllocator sourceIdAllocator;
//Acknowledgement number for packets //Acknowledgement number for packets
private int ackNumber = 0; private int ackNumber = 0;
//race xml sequence number //race xml sequence number
@ -47,14 +69,20 @@ public class ConnectionAcceptor implements Runnable {
/** /**
* Connection Acceptor Constructor * Connection Acceptor Constructor
* @param latestMessages Latest messages to be sent * @param latestMessages Latest messages to be sent
* @param compositeCommand Collection of commands for race to execute.
* @param sourceIdAllocator Object used to allocate source IDs for clients.
* @throws IOException if a server socket cannot be instantiated. * @throws IOException if a server socket cannot be instantiated.
*/ */
public ConnectionAcceptor(LatestMessages latestMessages) throws IOException { public ConnectionAcceptor(LatestMessages latestMessages, CompositeCommand compositeCommand, SourceIdAllocator sourceIdAllocator) throws IOException {
this.latestMessages = latestMessages; this.latestMessages = latestMessages;
this.compositeCommand = compositeCommand;
this.sourceIdAllocator = sourceIdAllocator;
this.serverSocket = new ServerSocket(serverPort); this.serverSocket = new ServerSocket(serverPort);
CheckClientConnection checkClientConnection = new CheckClientConnection(mockOutputList); CheckClientConnection checkClientConnection = new CheckClientConnection(clientConnections);
new Thread(checkClientConnection, "ConnectionAcceptor()->CheckClientConnection thread").start(); new Thread(checkClientConnection, "ConnectionAcceptor()->CheckClientConnection thread").start();
} }
public String getAddress() throws UnknownHostException { public String getAddress() throws UnknownHostException {
@ -71,28 +99,26 @@ public class ConnectionAcceptor implements Runnable {
@Override @Override
public void run() { public void run() {
while(mockOutputList.remainingCapacity() > 0) { while(clientConnections.remainingCapacity() > 0) {
try { try {
System.out.println("Waiting for a connection...");//TEMP DEBUG REMOVE System.out.println("Waiting for a connection...");//TEMP DEBUG REMOVE
Socket mockSocket = serverSocket.accept(); Socket mockSocket = serverSocket.accept();
//TODO at this point we need to assign the connection a boat source ID, if they requested to participate.
DataOutputStream outToVisualiser = new DataOutputStream(mockSocket.getOutputStream()); ClientConnection clientConnection = new ClientConnection(mockSocket, sourceIdAllocator, latestMessages, compositeCommand);
MockOutput mockOutput = new MockOutput(latestMessages, outToVisualiser);
ControllerServer controllerServer = new ControllerServer(mockSocket); //TODO probably pass assigned boat source ID into ControllerServer. clientConnections.add(clientConnection);
new Thread(clientConnection, "ConnectionAcceptor.run()->ClientConnection thread " + clientConnection).start();
new Thread(mockOutput, "ConnectionAcceptor.run()->MockOutput thread" + mockOutput).start();
new Thread(controllerServer, "ConnectionAcceptor.run()->ControllerServer thread" + controllerServer).start();
mockOutputList.add(mockOutput);
System.out.println(String.format("%d number of Visualisers Connected.", mockOutputList.size()));//TEMP Logger.getGlobal().log(Level.INFO, String.format("%d number of Visualisers Connected.", clientConnections.size()));
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace();//TODO handle this properly Logger.getGlobal().log(Level.WARNING, "Got an IOException while a client was attempting to connect.", e);
} }
@ -104,14 +130,14 @@ public class ConnectionAcceptor implements Runnable {
*/ */
class CheckClientConnection implements Runnable{ class CheckClientConnection implements Runnable{
private ArrayBlockingQueue<MockOutput> mocks; private ArrayBlockingQueue<ClientConnection> connections;
/** /**
* Constructor * Constructor
* @param mocks Mocks "connected" * @param connections Clients "connected"
*/ */
public CheckClientConnection(ArrayBlockingQueue<MockOutput> mocks){ public CheckClientConnection(ArrayBlockingQueue<ClientConnection> connections){
this.mocks = mocks; this.connections = connections;
} }
/** /**
@ -119,21 +145,27 @@ public class ConnectionAcceptor implements Runnable {
*/ */
@Override @Override
public void run() { public void run() {
double timeSinceLastHeartBeat = System.currentTimeMillis();
while(true) { while(true) {
//System.out.println(mocks.size());//used to see current amount of visualisers connected. //System.out.println(connections.size());//used to see current amount of visualisers connected.
ArrayBlockingQueue<MockOutput> m = new ArrayBlockingQueue<>(16, true, mocks); ArrayBlockingQueue<ClientConnection> clientConnections = new ArrayBlockingQueue<>(16, true, connections);
for (MockOutput mo : m) {
try { for (ClientConnection client : clientConnections) {
mo.sendHeartBeat(); if (!client.isAlive()) {
} catch (IOException e) { connections.remove(client);
mocks.remove(mo);
Logger.getGlobal().log(Level.WARNING, "CheckClientConnection is removing the dead connection: " + client);
} }
} }
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); Logger.getGlobal().log(Level.WARNING, "CheckClientConnection was interrupted while sleeping.", e);
Thread.currentThread().interrupt();
return;
} }
} }
} }

@ -1,10 +1,14 @@
package mock.app; package mock.app;
import mock.dataInput.PolarParser; import mock.dataInput.PolarParser;
import mock.exceptions.EventConstructionException;
import mock.model.MockRace; import mock.model.MockRace;
import mock.model.Polars; import mock.model.Polars;
import mock.model.RaceLogic; import mock.model.RaceLogic;
import mock.model.SourceIdAllocator;
import mock.model.commandFactory.CompositeCommand;
import network.Messages.LatestMessages; import network.Messages.LatestMessages;
import network.Messages.RaceSnapshot;
import shared.dataInput.*; import shared.dataInput.*;
import shared.enums.XMLFileType; import shared.enums.XMLFileType;
import shared.exceptions.InvalidBoatDataException; import shared.exceptions.InvalidBoatDataException;
@ -19,14 +23,18 @@ import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.logging.Level;
import java.util.logging.Logger;
/** /**
* A Race Event, this holds all of the race's information as well as handling the connection to its clients. * A Race Event, this holds all of the race's information as well as handling the connection to its clients.
*/ */
public class Event { public class Event {
private static Event theEvent = new Event();
/**
* Contents of the various xml files.
*/
private String raceXML; private String raceXML;
private String regattaXML; private String regattaXML;
private String boatXML; private String boatXML;
@ -35,36 +43,75 @@ public class Event {
private Polars boatPolars; private Polars boatPolars;
/**
* Data sources containing data from the xml files.
*/
RaceDataSource raceDataSource;
BoatDataSource boatDataSource;
RegattaDataSource regattaDataSource;
private ConnectionAcceptor connectionAcceptor; private ConnectionAcceptor connectionAcceptor;
private LatestMessages latestMessages; private LatestMessages latestMessages;
private CompositeCommand compositeCommand;
/**
* This is used to allocate source IDs.
*/
private SourceIdAllocator sourceIdAllocator;
/** /**
* Constructs an event, using various XML files. * Constructs an event, using various XML files.
* @throws EventConstructionException Thrown if we cannot create an Event for any reason.
*/ */
private Event() { public Event() throws EventConstructionException {
//Read XML files.
try { try {
this.raceXML = getRaceXMLAtCurrentTime(XMLReader.readXMLFileToString("mock/mockXML/raceTest.xml", StandardCharsets.UTF_8)); this.raceXML = getRaceXMLAtCurrentTime(XMLReader.readXMLFileToString("mock/mockXML/raceTest.xml", StandardCharsets.UTF_8));
this.boatXML = XMLReader.readXMLFileToString("mock/mockXML/boatsSinglePlayer.xml", StandardCharsets.UTF_8); this.boatXML = XMLReader.readXMLFileToString("mock/mockXML/boatsSinglePlayer.xml", StandardCharsets.UTF_8);
this.regattaXML = XMLReader.readXMLFileToString("mock/mockXML/regattaTest.xml", StandardCharsets.UTF_8); this.regattaXML = XMLReader.readXMLFileToString("mock/mockXML/regattaTest.xml", StandardCharsets.UTF_8);
this.xmlFileType = XMLFileType.Contents;
this.boatPolars = PolarParser.parse("mock/polars/acc_polars.csv"); } catch (TransformerException | XMLReaderException e) {
throw new EventConstructionException("Could not read XML files.", e);
}
this.xmlFileType = XMLFileType.Contents;
this.boatPolars = PolarParser.parse("mock/polars/acc_polars.csv");
//Parse the XML files into data sources.
try {
this.raceDataSource = new RaceXMLReader(this.raceXML, this.xmlFileType);
this.boatDataSource = new BoatXMLReader(this.boatXML, this.xmlFileType);
this.regattaDataSource = new RegattaXMLReader(this.regattaXML, this.xmlFileType);
} catch (XMLReaderException | InvalidRaceDataException | InvalidRegattaDataException | InvalidBoatDataException e) {
throw new EventConstructionException("Could not parse XML files.", e);
this.latestMessages = new LatestMessages();
this.connectionAcceptor = new ConnectionAcceptor(latestMessages);
} }
catch (IOException e) {
e.printStackTrace(); //Create connection acceptor.
} catch (XMLReaderException e) { this.sourceIdAllocator = new SourceIdAllocator(raceDataSource.getParticipants());
e.printStackTrace(); this.compositeCommand = new CompositeCommand();
} catch (TransformerException e) { this.latestMessages = new LatestMessages();
e.printStackTrace();
try {
this.connectionAcceptor = new ConnectionAcceptor(latestMessages, compositeCommand, sourceIdAllocator);
} catch (IOException e) {
throw new EventConstructionException("Could not create ConnectionAcceptor.", e);
} }
} }
public static Event getEvent() {
return theEvent;
}
public String getAddress() throws UnknownHostException { public String getAddress() throws UnknownHostException {
return connectionAcceptor.getAddress(); return connectionAcceptor.getAddress();
@ -76,23 +123,16 @@ public class Event {
/** /**
* Sends the initial race data and then begins race simulation. * Sends the initial race data and then begins race simulation.
* @throws InvalidRaceDataException Thrown if the race xml file cannot be parsed.
* @throws XMLReaderException Thrown if any of the xml files cannot be parsed.
* @throws InvalidBoatDataException Thrown if the boat xml file cannot be parsed.
* @throws InvalidRegattaDataException Thrown if the regatta xml file cannot be parsed.
*/ */
public void start() throws InvalidRaceDataException, XMLReaderException, InvalidBoatDataException, InvalidRegattaDataException { public void start() {
new Thread(connectionAcceptor, "Event.Start()->ConnectionAcceptor thread").start(); new Thread(connectionAcceptor, "Event.Start()->ConnectionAcceptor thread").start();
sendXMLs(); sendXMLs();
//Parse the XML files into data sources.
RaceDataSource raceDataSource = new RaceXMLReader(this.raceXML, this.xmlFileType);
BoatDataSource boatDataSource = new BoatXMLReader(this.boatXML, this.xmlFileType);
RegattaDataSource regattaDataSource = new RegattaXMLReader(this.regattaXML, this.xmlFileType);
//Create and start race. //Create and start race.
RaceLogic newRace = new RaceLogic(new MockRace(boatDataSource, raceDataSource, regattaDataSource, this.latestMessages, this.boatPolars, Constants.RaceTimeScale), this.latestMessages); RaceLogic newRace = new RaceLogic(new MockRace(boatDataSource, raceDataSource, regattaDataSource, this.latestMessages, this.boatPolars, Constants.RaceTimeScale), this.latestMessages, this.compositeCommand);
new Thread(newRace, "Event.Start()->RaceLogic thread").start(); new Thread(newRace, "Event.Start()->RaceLogic thread").start();
} }

@ -2,37 +2,25 @@ package mock.app;
import network.BinaryMessageEncoder;
import network.Exceptions.InvalidMessageException;
import network.MessageEncoders.RaceVisionByteEncoder;
import network.Messages.*; import network.Messages.*;
import network.Messages.Enums.MessageType; import shared.model.RunnableWithFramePeriod;
import java.io.DataOutputStream; import java.util.List;
import java.io.IOException; import java.util.concurrent.BlockingQueue;
import java.net.SocketException;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
/** /**
* TCP server to send race information to connected clients. * TCP server to send race information to connected clients.
*/ */
public class MockOutput implements Runnable public class MockOutput implements RunnableWithFramePeriod {
{
/**
* Timestamp of the last sent heartbeat message.
*/
private long lastHeartbeatTime;
/**
* Period for the heartbeat - that is, how often we send it.
*/
private double heartbeatPeriod = 5.0;
/** /**
* Output stream which wraps around mockSocket outstream. * A queue to send messages to client.
*/ */
private DataOutputStream outToVisualiser; private BlockingQueue<AC35Data> outgoingMessages;
/** /**
@ -43,187 +31,21 @@ public class MockOutput implements Runnable
/**
* Ack numbers used in messages.
*/
private int ackNumber = 1;
/**
* Sequence number for heartbeat messages.
*/
private int heartbeatSequenceNum = 1;
/** /**
* Ctor. * Ctor.
* @param latestMessages Latests Messages that the Mock is to send out * @param latestMessages Latest Messages that the Mock is to send out
* @param outToVisualiser DataStream to output to Visualisers * @param outgoingMessages A queue to place outgoing messages on.
* @throws IOException if server socket cannot be opened.
*/ */
public MockOutput(LatestMessages latestMessages, DataOutputStream outToVisualiser) throws IOException { public MockOutput(LatestMessages latestMessages, BlockingQueue<AC35Data> outgoingMessages) {
this.outgoingMessages = outgoingMessages;
this.outToVisualiser = outToVisualiser;
this.lastHeartbeatTime = System.currentTimeMillis();
this.latestMessages = latestMessages; this.latestMessages = latestMessages;
}
/**
* Increments the ackNumber value, and returns it.
* @return Incremented ackNumber.
*/
private int getNextAckNumber(){
this.ackNumber++;
return this.ackNumber;
}
/**
* Calculates the time since last heartbeat message, in seconds.
* @return Time since last heartbeat message, in seconds.
*/
private double timeSinceHeartbeat() {
long now = System.currentTimeMillis();
return (now - lastHeartbeatTime) / 1000.0;
}
/**
* Generates the next heartbeat message and returns it. Increments the heartbeat sequence number.
* @return The next heartbeat message.
*/
private HeartBeat createHeartbeatMessage() {
//Create the heartbeat message.
HeartBeat heartBeat = new HeartBeat(this.heartbeatSequenceNum);
heartbeatSequenceNum++;
return heartBeat;
}
/**
* Serializes a heartbeat message into a packet to be sent, and returns the byte array.
* @param heartBeat The heartbeat message to serialize.
* @return Byte array containing the next heartbeat message.
* @throws InvalidMessageException Thrown if the message cannot be encoded.
*/
private byte[] parseHeartbeat(HeartBeat heartBeat) throws InvalidMessageException {
//Serializes the heartbeat message.
byte[] heartbeatMessage = RaceVisionByteEncoder.encode(heartBeat);
//Places the serialized message in a packet.
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(
MessageType.HEARTBEAT,
System.currentTimeMillis(),
getNextAckNumber(),
(short) heartbeatMessage.length,
heartbeatMessage );
return binaryMessageEncoder.getFullMessage();
}
/**
* Encodes/serialises a XMLMessage message, and returns it.
* @param xmlMessage The XMLMessage message to serialise.
* @return The XMLMessage message in a serialised form.
* @throws InvalidMessageException Thrown if the message cannot be encoded.
*/
private synchronized byte[] parseXMLMessage(XMLMessage xmlMessage) throws InvalidMessageException {
//Serialize the xml message.
byte[] encodedXML = RaceVisionByteEncoder.encode(xmlMessage);
//Place the message in a packet.
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(
MessageType.XMLMESSAGE,
System.currentTimeMillis(),
xmlMessage.getAckNumber(), //We use the ack number from the xml message.
(short) encodedXML.length,
encodedXML );
return binaryMessageEncoder.getFullMessage();
} }
/**
* Encodes/serialises a BoatLocation message, and returns it.
* @param boatLocation The BoatLocation message to serialise.
* @return The BoatLocation message in a serialised form.
* @throws InvalidMessageException If the message cannot be encoded.
*/
private synchronized byte[] parseBoatLocation(BoatLocation boatLocation) throws InvalidMessageException {
//Encodes the message.
byte[] encodedBoatLoc = RaceVisionByteEncoder.encode(boatLocation);
//Encodes the full message with header.
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(
MessageType.BOATLOCATION,
System.currentTimeMillis(),
getNextAckNumber(),
(short) encodedBoatLoc.length,
encodedBoatLoc );
return binaryMessageEncoder.getFullMessage();
}
/**
* Encodes/serialises a RaceStatus message, and returns it.
* @param raceStatus The RaceStatus message to serialise.
* @return The RaceStatus message in a serialised form.
* @throws InvalidMessageException Thrown if the message cannot be encoded.
*/
private synchronized byte[] parseRaceStatus(RaceStatus raceStatus) throws InvalidMessageException {
//Encodes the messages.
byte[] encodedRaceStatus = RaceVisionByteEncoder.encode(raceStatus);
//Encodes the full message with header.
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(
MessageType.RACESTATUS,
System.currentTimeMillis(),
getNextAckNumber(),
(short) encodedRaceStatus.length,
encodedRaceStatus );
return binaryMessageEncoder.getFullMessage();
}
/**
* Sends a heartbeat
* @throws IOException if the socket is no longer open at both ends the heartbeat returns an error.
*/
public void sendHeartBeat() throws IOException {
//Sends a heartbeat every so often.
if (timeSinceHeartbeat() >= heartbeatPeriod) {
HeartBeat heartBeat = createHeartbeatMessage();
try {
outToVisualiser.write(parseHeartbeat(heartBeat));
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode HeartBeat: " + heartBeat, e);
}
lastHeartbeatTime = System.currentTimeMillis();
}
}
/** /**
* Sending loop of the Server * Sending loop of the Server
*/ */
@ -251,107 +73,40 @@ public class MockOutput implements Runnable
long previousFrameTime = System.currentTimeMillis(); long previousFrameTime = System.currentTimeMillis();
boolean sentXMLs = false; boolean sentXMLs = false;
try {
while (!Thread.interrupted()) {
try {
long currentFrameTime = System.currentTimeMillis(); while (!Thread.interrupted()) {
//This is the time elapsed, in milliseconds, since the last server "frame". try {
long framePeriod = currentFrameTime - previousFrameTime;
//We only attempt to send packets every X milliseconds.
long minimumFramePeriod = 16;
if (framePeriod >= minimumFramePeriod) {
//Send XML messages.
if (!sentXMLs) {
//Serialise them.
try {
byte[] raceXMLBlob = parseXMLMessage(latestMessages.getRaceXMLMessage());
byte[] regattaXMLBlob = parseXMLMessage(latestMessages.getRegattaXMLMessage());
byte[] boatsXMLBlob = parseXMLMessage(latestMessages.getBoatXMLMessage());
//Send them.
outToVisualiser.write(raceXMLBlob);
outToVisualiser.write(regattaXMLBlob);
outToVisualiser.write(boatsXMLBlob);
sentXMLs = true;
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode XMLMessage: " + latestMessages.getRaceXMLMessage(), e);
continue; //Go to next iteration.
}
}
//Sends the RaceStatus message.
if (this.latestMessages.getRaceStatus() != null) {
try {
byte[] raceStatusBlob = this.parseRaceStatus(this.latestMessages.getRaceStatus());
this.outToVisualiser.write(raceStatusBlob);
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode RaceStatus: " + latestMessages.getRaceStatus(), e);
}
}
//Send all of the BoatLocation messages.
for (int sourceID : this.latestMessages.getBoatLocationMap().keySet()) {
//Get the message.
BoatLocation boatLocation = this.latestMessages.getBoatLocation(sourceID);
if (boatLocation != null) {
try {
//Encode.
byte[] boatLocationBlob = this.parseBoatLocation(boatLocation);
//Write it.
this.outToVisualiser.write(boatLocationBlob);
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode BoatLocation: " + boatLocation, e);
}
}
}
previousFrameTime = currentFrameTime; long currentFrameTime = System.currentTimeMillis();
waitForFramePeriod(previousFrameTime, currentFrameTime, 16);
previousFrameTime = currentFrameTime;
} else { //Send XML messages.
//Wait until the frame period will be large enough. if (!sentXMLs) {
long timeToWait = minimumFramePeriod - framePeriod;
try { outgoingMessages.put(latestMessages.getRaceXMLMessage());
Thread.sleep(timeToWait); outgoingMessages.put(latestMessages.getRegattaXMLMessage());
} catch (InterruptedException e) { outgoingMessages.put(latestMessages.getBoatXMLMessage());
//If we get interrupted, exit the function.
Logger.getGlobal().log(Level.WARNING, "MockOutput.run().sleep(framePeriod) was interrupted on thread: " + Thread.currentThread(), e);
//Re-set the interrupt flag.
Thread.currentThread().interrupt();
return;
}
} sentXMLs = true;
}
} catch (SocketException e) { List<AC35Data> snapshot = latestMessages.getSnapshot();
break; for (AC35Data message : snapshot) {
outgoingMessages.put(message);
} }
} catch (InterruptedException e) {
Logger.getGlobal().log(Level.WARNING, "MockOutput.run() interrupted while putting message in queue.", e);
Thread.currentThread().interrupt();
return;
} }
} catch (IOException e) {
e.printStackTrace();
} }
} }

@ -0,0 +1,84 @@
package mock.enums;
import java.util.HashMap;
import java.util.Map;
/**
* The states in which a connection to a client may have.
*/
public enum ConnectionStateEnum {
UNKNOWN(0),
/**
* We're waiting for the client to complete the joining handshake (see {@link network.Messages.RequestToJoin}.
*/
WAITING_FOR_HANDSHAKE(1),
/**
* The client has completed the handshake, and is connected.
*/
CONNECTED(2),
/**
* The client has timed out.
*/
TIMED_OUT(3);
private byte value;
/**
* Ctor. Creates a ConnectionStateEnum from a given primitive integer value, cast to a byte.
* @param value Integer, which is cast to byte, to construct from.
*/
private ConnectionStateEnum(int value) {
this.value = (byte) value;
}
/**
* Returns the primitive value of the enum.
* @return Primitive value of the enum.
*/
public byte getValue() {
return value;
}
/**
* Stores a mapping between Byte values and ConnectionStateEnum values.
*/
private static final Map<Byte, ConnectionStateEnum> byteToStatusMap = new HashMap<>();
/*
Static initialization block. Initializes the byteToStatusMap.
*/
static {
for (ConnectionStateEnum type : ConnectionStateEnum.values()) {
ConnectionStateEnum.byteToStatusMap.put(type.value, type);
}
}
/**
* Returns the enumeration value which corresponds to a given byte value.
* @param connectionState Byte value to convert to a ConnectionStateEnum value.
* @return The ConnectionStateEnum value which corresponds to the given byte value.
*/
public static ConnectionStateEnum fromByte(byte connectionState) {
//Gets the corresponding MessageType from the map.
ConnectionStateEnum type = ConnectionStateEnum.byteToStatusMap.get(connectionState);
if (type == null) {
//If the byte value wasn't found, return the UNKNOWN connectionState.
return ConnectionStateEnum.UNKNOWN;
} else {
//Otherwise, return the connectionState.
return type;
}
}
}

@ -0,0 +1,24 @@
package mock.exceptions;
/**
* An exception thrown when we cannot create an {@link mock.app.Event}.
*/
public class EventConstructionException extends Exception {
/**
* Constructs the exception with a given message.
* @param message Message to store.
*/
public EventConstructionException(String message) {
super(message);
}
/**
* Constructs the exception with a given message and cause.
* @param message Message to store.
* @param cause Cause to store.
*/
public EventConstructionException(String message, Throwable cause) {
super(message, cause);
}
}

@ -0,0 +1,24 @@
package mock.exceptions;
/**
* An exception thrown when we cannot allocate a source ID.
*/
public class SourceIDAllocationException extends Exception {
/**
* Constructs the exception with a given message.
* @param message Message to store.
*/
public SourceIDAllocationException(String message) {
super(message);
}
/**
* Constructs the exception with a given message and cause.
* @param message Message to store.
* @param cause Cause to store.
*/
public SourceIDAllocationException(String message, Throwable cause) {
super(message, cause);
}
}

@ -0,0 +1,243 @@
package mock.model;
import mock.app.MockOutput;
import mock.enums.ConnectionStateEnum;
import shared.exceptions.HandshakeException;
import mock.exceptions.SourceIDAllocationException;
import mock.model.commandFactory.CompositeCommand;
import network.Messages.*;
import network.Messages.Enums.JoinAcceptanceEnum;
import network.Messages.Enums.MessageType;
import network.Messages.Enums.RequestToJoinEnum;
import network.StreamRelated.MessageDeserialiser;
import network.StreamRelated.MessageSerialiser;
import visualiser.gameController.ControllerServer;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This class handles the client connection handshake, and creation of MockOutput and ControllerServer.
*/
public class ClientConnection implements Runnable {
/**
* The socket for the client's connection.
*/
private Socket socket;
/**
* Periodically sends HeartBeat messages to client.
*/
private HeartBeatService heartBeatService;
/**
* Used to allocate source ID to client, if they request to participate.
*/
private SourceIdAllocator sourceIdAllocator;
/**
* Latest snapshot of the race, to send to client. Currently only used for XML messages.
*/
private LatestMessages latestMessages;
/**
* Collection of commands from client for race to execute.
*/
private CompositeCommand compositeCommand;
/**
* Used to send the race snapshot to client.
*/
private MockOutput mockOutput;
/**
* Used to receive client input, and turn it into commands.
*/
private ControllerServer controllerServer;
/**
* Used to write messages to socket.
*/
private MessageSerialiser messageSerialiser;
/**
* Stores messages to write to socket.
*/
private BlockingQueue<AC35Data> outputQueue;
/**
* Used to read messages from socket.
*/
private MessageDeserialiser messageDeserialiser;
/**
* Stores messages read from socket.
*/
private BlockingQueue<AC35Data> inputQueue;
/**
* The state of the connection to the client.
*/
private ConnectionStateEnum connectionState = ConnectionStateEnum.UNKNOWN;
/**
* Creates a client connection, using a given socket.
* @param socket The socket which connects to the client.
* @param sourceIdAllocator Used to allocate a source ID for the client.
* @param latestMessages Latest race snapshot to send to client.
* @param compositeCommand Collection of commands for race to execute.
* @throws IOException Thrown if there is a problem with the client socket.
*/
public ClientConnection(Socket socket, SourceIdAllocator sourceIdAllocator, LatestMessages latestMessages, CompositeCommand compositeCommand) throws IOException {
this.socket = socket;
this.sourceIdAllocator = sourceIdAllocator;
this.latestMessages = latestMessages;
this.compositeCommand = compositeCommand;
this.outputQueue = new LinkedBlockingQueue<>();
this.inputQueue = new LinkedBlockingQueue<>();
this.messageSerialiser = new MessageSerialiser(socket.getOutputStream(), outputQueue);
this.messageDeserialiser = new MessageDeserialiser(socket.getInputStream(), inputQueue);
new Thread(messageSerialiser, "ClientConnection()->MessageSerialiser thread " + messageSerialiser).start();
new Thread(messageDeserialiser, "ClientConnection()->MessageDeserialiser thread " + messageDeserialiser).start();
this.heartBeatService = new HeartBeatService(outputQueue);
new Thread(heartBeatService, "ClientConnection()->HeartBeatService thread " + heartBeatService).start();
}
@Override
public void run() {
try {
handshake();
} catch (HandshakeException | SourceIDAllocationException e) {
Logger.getGlobal().log(Level.WARNING, "Client handshake failed.", e);
Thread.currentThread().interrupt();
return;
}
}
/**
* Initiates the handshake with the client.
* @throws HandshakeException Thrown if something goes wrong with the handshake.
* @throws SourceIDAllocationException Thrown if we cannot allocate a sourceID.
*/
private void handshake() throws SourceIDAllocationException, HandshakeException {
//This function is a bit messy, and could probably be refactored a bit.
connectionState = ConnectionStateEnum.WAITING_FOR_HANDSHAKE;
RequestToJoin requestToJoin = waitForRequestToJoin();
int allocatedSourceID = 0;
//If they want to participate, give them a source ID number.
if (requestToJoin.getRequestType() == RequestToJoinEnum.PARTICIPANT) {
allocatedSourceID = sourceIdAllocator.allocateSourceID();
this.controllerServer = new ControllerServer(compositeCommand, inputQueue, allocatedSourceID);
new Thread(controllerServer, "ClientConnection.run()->ControllerServer thread" + controllerServer).start();
}
this.mockOutput = new MockOutput(latestMessages, outputQueue);
new Thread(mockOutput, "ClientConnection.run()->MockOutput thread" + mockOutput).start();
sendJoinAcceptanceMessage(allocatedSourceID);
connectionState = ConnectionStateEnum.CONNECTED;
}
/**
* Waits until the client sends a {@link RequestToJoin} message, and returns it.
* @return The {@link RequestToJoin} message.
* @throws HandshakeException Thrown if we get interrupted while waiting.
*/
private RequestToJoin waitForRequestToJoin() throws HandshakeException {
try {
while (connectionState == ConnectionStateEnum.WAITING_FOR_HANDSHAKE) {
AC35Data message = inputQueue.take();
//We need to wait until they actually send a join request.
if (message.getType() == MessageType.REQUEST_TO_JOIN) {
return (RequestToJoin) message;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new HandshakeException("Handshake failed. Thread: " + Thread.currentThread() + " was interrupted while waiting on the incoming message queue.", e);
}
throw new HandshakeException("Handshake was cancelled. Connection state is now: " + connectionState);
}
/**
* Sends the client a {@link JoinAcceptance} message, containing their assigned sourceID.
* @param sourceID The sourceID to assign to client.
* @throws HandshakeException Thrown if the thread is interrupted while placing message on the outgoing message queue.
*/
private void sendJoinAcceptanceMessage(int sourceID) throws HandshakeException {
//Send them the source ID.
JoinAcceptance joinAcceptance = new JoinAcceptance(JoinAcceptanceEnum.JOIN_SUCCESSFUL, sourceID);
try {
outputQueue.put(joinAcceptance);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new HandshakeException("Handshake failed. Thread: " + Thread.currentThread() + " interrupted while placing JoinAcceptance message on outgoing message queue.", e);
}
}
/**
* Determines whether or not this connection is still alive.
* This is based off whether the {@link MessageSerialiser} is still alive.
* @return True if it is alive, false otherwise.
*/
public boolean isAlive() {
return messageSerialiser.isRunning();
}
}

@ -0,0 +1,110 @@
package mock.model;
import network.Messages.AC35Data;
import network.Messages.HeartBeat;
import shared.model.RunnableWithFramePeriod;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This class is responsible for sending {@link HeartBeat} messages to queue.
*/
public class HeartBeatService implements RunnableWithFramePeriod {
/**
* Timestamp of the last sent heartbeat message.
*/
private long lastHeartbeatTime;
/**
* Period for the heartbeat - that is, how often we send it. Milliseconds.
*/
private long heartbeatPeriod = 5000;
/**
* The messages we're writing to the stream.
*/
private BlockingQueue<AC35Data> messagesToSend;
/**
* Sequence number for heartbeat messages.
*/
private int heartbeatSequenceNum = 1;
/**
* Constructs a new HeartBeatService to send heartBeat messages to a given outputStream.
* @param messagesToSend The queue to send heartBeat messages to.
*/
public HeartBeatService(BlockingQueue<AC35Data> messagesToSend) {
this.messagesToSend = messagesToSend;
this.lastHeartbeatTime = System.currentTimeMillis();
}
/**
* Increments the {@link #heartbeatSequenceNum} value, and returns it.
* @return Incremented heat beat number.
*/
private int getNextHeartBeatNumber(){
this.heartbeatSequenceNum++;
return this.heartbeatSequenceNum;
}
/**
* Generates the next heartbeat message and returns it. Increments the heartbeat sequence number.
* @return The next heartbeat message.
*/
private HeartBeat createHeartbeatMessage() {
HeartBeat heartBeat = new HeartBeat(getNextHeartBeatNumber());
return heartBeat;
}
/**
* Puts a HeartBeat message on the message queue.
* @throws InterruptedException Thrown if the thread is interrupted.
*/
private void sendHeartBeat() throws InterruptedException {
HeartBeat heartBeat = createHeartbeatMessage();
messagesToSend.put(heartBeat);
}
@Override
public void run() {
while (!Thread.interrupted()) {
long currentFrameTime = System.currentTimeMillis();
waitForFramePeriod(lastHeartbeatTime, currentFrameTime, heartbeatPeriod);
lastHeartbeatTime = currentFrameTime;
try {
sendHeartBeat();
} catch (InterruptedException e) {
Logger.getGlobal().log(Level.WARNING, "HeartBeatService: " + this + " sendHeartBeat() was interrupted on thread: " + Thread.currentThread(), e);
Thread.currentThread().interrupt();
return;
}
}
}
}

@ -7,6 +7,7 @@ import shared.dataInput.BoatDataSource;
import shared.dataInput.RaceDataSource; import shared.dataInput.RaceDataSource;
import network.Messages.Enums.RaceStatusEnum; import network.Messages.Enums.RaceStatusEnum;
import shared.dataInput.RegattaDataSource; import shared.dataInput.RegattaDataSource;
import shared.exceptions.BoatNotFoundException;
import shared.model.*; import shared.model.*;
import shared.model.Bearing; import shared.model.Bearing;
@ -411,6 +412,25 @@ public class MockRace extends Race {
return boats; return boats;
} }
/**
* Returns a boat by sourceID.
* @param sourceID The source ID the boat.
* @return The boat.
* @throws BoatNotFoundException Thrown if there is not boat with the specified sourceID.
*/
public MockBoat getBoat(int sourceID) throws BoatNotFoundException {
for (MockBoat boat : boats) {
if (boat.getSourceID() == sourceID) {
return boat;
}
}
throw new BoatNotFoundException("Boat with sourceID: " + sourceID + " was not found.");
}
/** /**
* Changes the wind direction randomly, while keeping it within [windLowerBound, windUpperBound]. * Changes the wind direction randomly, while keeping it within [windLowerBound, windUpperBound].
*/ */

@ -1,12 +1,15 @@
package mock.model; package mock.model;
import javafx.animation.AnimationTimer; import javafx.animation.AnimationTimer;
import mock.model.commandFactory.CompositeCommand;
import network.Messages.Enums.BoatStatusEnum; import network.Messages.Enums.BoatStatusEnum;
import network.Messages.Enums.RaceStatusEnum; import network.Messages.Enums.RaceStatusEnum;
import network.Messages.LatestMessages; import network.Messages.LatestMessages;
import shared.model.Race;
public class RaceLogic implements Runnable { public class RaceLogic implements Runnable {
/** /**
* State of current race modified by this object * State of current race modified by this object
*/ */
@ -16,14 +19,18 @@ public class RaceLogic implements Runnable {
*/ */
private RaceServer server; private RaceServer server;
private CompositeCommand commands;
/** /**
* Initialises race loop with state and server message queue * Initialises race loop with state and server message queue
* @param race state of race to modify * @param race state of race to modify
* @param messages to send to server * @param messages to send to server
* @param compositeCommand Commands from clients to execute.
*/ */
public RaceLogic(MockRace race, LatestMessages messages) { public RaceLogic(MockRace race, LatestMessages messages, CompositeCommand compositeCommand) {
this.race = race; this.race = race;
this.server = new RaceServer(race, messages); this.server = new RaceServer(race, messages);
this.commands = compositeCommand;
} }
/** /**
@ -56,17 +63,13 @@ public class RaceLogic implements Runnable {
//Provide boat's with an estimated time at next mark until the race starts. //Provide boat's with an estimated time at next mark until the race starts.
race.setBoatsTimeNextMark(race.getRaceClock().getCurrentTime()); race.setBoatsTimeNextMark(race.getRaceClock().getCurrentTime());
//Parse the boat locations. //Parse the race snapshot.
server.parseBoatLocations(); server.parseSnapshot();
//Parse the marks.
server.parseMarks();
// Change wind direction // Change wind direction
race.changeWindDirection(); race.changeWindDirection();
//Parse the race status.
server.parseRaceStatus();
if (race.getRaceStatusEnum() == RaceStatusEnum.STARTED) { if (race.getRaceStatusEnum() == RaceStatusEnum.STARTED) {
@ -109,6 +112,9 @@ public class RaceLogic implements Runnable {
//Get the current time. //Get the current time.
currentTime = System.currentTimeMillis(); currentTime = System.currentTimeMillis();
//Execute commands from clients.
commands.execute(race);
//Update race time. //Update race time.
race.updateRaceTime(currentTime); race.updateRaceTime(currentTime);
@ -123,7 +129,6 @@ public class RaceLogic implements Runnable {
//If it is still racing, update its position. //If it is still racing, update its position.
if (boat.getStatus() == BoatStatusEnum.RACING) { if (boat.getStatus() == BoatStatusEnum.RACING) {
race.updatePosition(boat, framePeriod, race.getRaceClock().getDurationMilli()); race.updatePosition(boat, framePeriod, race.getRaceClock().getDurationMilli());
} }
@ -141,15 +146,8 @@ public class RaceLogic implements Runnable {
// Change wind direction // Change wind direction
race.changeWindDirection(); race.changeWindDirection();
//Parse the boat locations. //Parse the race snapshot.
server.parseBoatLocations(); server.parseSnapshot();
//Parse the marks.
server.parseMarks();
//Parse the race status.
server.parseRaceStatus();
//Update the last frame time. //Update the last frame time.
this.lastFrameTime = currentTime; this.lastFrameTime = currentTime;
@ -165,7 +163,7 @@ public class RaceLogic implements Runnable {
@Override @Override
public void handle(long now) { public void handle(long now) {
server.parseRaceStatus(); server.parseSnapshot();
if (iters > 500) { if (iters > 500) {
stop(); stop();
@ -173,4 +171,5 @@ public class RaceLogic implements Runnable {
iters++; iters++;
} }
}; };
} }

@ -1,10 +1,7 @@
package mock.model; package mock.model;
import network.Messages.BoatLocation; import network.Messages.*;
import network.Messages.BoatStatus;
import network.Messages.Enums.BoatLocationDeviceEnum; import network.Messages.Enums.BoatLocationDeviceEnum;
import network.Messages.LatestMessages;
import network.Messages.RaceStatus;
import network.Utils.AC35UnitConverter; import network.Utils.AC35UnitConverter;
import shared.model.Bearing; import shared.model.Bearing;
import shared.model.CompoundMark; import shared.model.CompoundMark;
@ -21,10 +18,6 @@ public class RaceServer {
private MockRace race; private MockRace race;
private LatestMessages latestMessages; private LatestMessages latestMessages;
/**
* The sequence number of the latest RaceStatus message sent or received.
*/
private int raceStatusSequenceNumber = 1;
/** /**
* The sequence number of the latest BoatLocation message sent or received. * The sequence number of the latest BoatLocation message sent or received.
@ -39,10 +32,31 @@ public class RaceServer {
/** /**
* Parses an individual marker boat, and sends it to mockOutput. * Parses the race to create a snapshot, and places it in latestMessages.
*/
public void parseSnapshot() {
List<AC35Data> snapshotMessages = new ArrayList<>();
//Parse the boat locations.
snapshotMessages.addAll(parseBoatLocations());
//Parse the marks.
snapshotMessages.addAll(parseMarks());
//Parse the race status.
snapshotMessages.add(parseRaceStatus());
latestMessages.setSnapshot(snapshotMessages);
}
/**
* Parses an individual marker boat, and returns it.
* @param mark The marker boat to parse. * @param mark The marker boat to parse.
* @return The BoatLocation message.
*/ */
private void parseIndividualMark(Mark mark) { private BoatLocation parseIndividualMark(Mark mark) {
//Create message. //Create message.
BoatLocation boatLocation = new BoatLocation( BoatLocation boatLocation = new BoatLocation(
mark.getSourceID(), mark.getSourceID(),
@ -57,13 +71,17 @@ public class RaceServer {
//Iterates the sequence number. //Iterates the sequence number.
this.boatLocationSequenceNumber++; this.boatLocationSequenceNumber++;
this.latestMessages.setBoatLocation(boatLocation); return boatLocation;
} }
/** /**
* Parse the compound marker boats through mock output. * Parse the compound marker boats, and returns a list of BoatLocation messages.
* @return BoatLocation messages for each mark.
*/ */
public void parseMarks() { private List<BoatLocation> parseMarks() {
List<BoatLocation> markLocations = new ArrayList<>(race.getCompoundMarks().size());
for (CompoundMark compoundMark : race.getCompoundMarks()) { for (CompoundMark compoundMark : race.getCompoundMarks()) {
//Get the individual marks from the compound mark. //Get the individual marks from the compound mark.
@ -72,31 +90,40 @@ public class RaceServer {
//If they aren't null, parse them (some compound marks only have one mark). //If they aren't null, parse them (some compound marks only have one mark).
if (mark1 != null) { if (mark1 != null) {
this.parseIndividualMark(mark1); markLocations.add(this.parseIndividualMark(mark1));
} }
if (mark2 != null) { if (mark2 != null) {
this.parseIndividualMark(mark2); markLocations.add(this.parseIndividualMark(mark2));
} }
} }
return markLocations;
} }
/** /**
* Parse the boats in the race, and send it to mockOutput. * Parse the boats in the race, and returns all of their BoatLocation messages.
* @return List of BoatLocation messages, for each boat.
*/ */
public void parseBoatLocations() { private List<BoatLocation> parseBoatLocations() {
List<BoatLocation> boatLocations = new ArrayList<>(race.getBoats().size());
//Parse each boat. //Parse each boat.
for (MockBoat boat : race.getBoats()) { for (MockBoat boat : race.getBoats()) {
this.parseIndividualBoatLocation(boat); boatLocations.add(this.parseIndividualBoatLocation(boat));
} }
return boatLocations;
} }
/** /**
* Parses an individual boat, and sends it to mockOutput. * Parses an individual boat, and returns it.
* @param boat The boat to parse. * @param boat The boat to parse.
* @return The BoatLocation message.
*/ */
private void parseIndividualBoatLocation(MockBoat boat) { private BoatLocation parseIndividualBoatLocation(MockBoat boat) {
BoatLocation boatLocation = new BoatLocation( BoatLocation boatLocation = new BoatLocation(
boat.getSourceID(), boat.getSourceID(),
@ -111,16 +138,17 @@ public class RaceServer {
//Iterates the sequence number. //Iterates the sequence number.
this.boatLocationSequenceNumber++; this.boatLocationSequenceNumber++;
this.latestMessages.setBoatLocation(boatLocation); return boatLocation;
} }
/** /**
* Parses the race status, and sends it to mockOutput. * Parses the race status, and returns it.
* @return The race status message.
*/ */
public void parseRaceStatus() { private RaceStatus parseRaceStatus() {
//A race status message contains a list of boat statuses. //A race status message contains a list of boat statuses.
List<BoatStatus> boatStatuses = new ArrayList<>(); List<BoatStatus> boatStatuses = new ArrayList<>();
@ -151,6 +179,6 @@ public class RaceServer {
race.getRaceType(), race.getRaceType(),
boatStatuses); boatStatuses);
this.latestMessages.setRaceStatus(raceStatus); return raceStatus;
} }
} }

@ -0,0 +1,70 @@
package mock.model;
import mock.exceptions.SourceIDAllocationException;
import java.util.ArrayList;
import java.util.List;
/**
* This class is responsible for allocating boat source IDs for use in a race, upon request.
*/
public class SourceIdAllocator {
/**
* This list contains all unallocated source IDs.
*/
List<Integer> unallocatedIDs = new ArrayList<>();
/**
* This list contains all allocated source IDs.
*/
List<Integer> allocatedIDs = new ArrayList<>();
/**
* Creates a source ID allocator, using the given list of unallocated source IDs.
* @param unallocatedIDs List of unallocated source IDs.
*/
public SourceIdAllocator(List<Integer> unallocatedIDs) {
//We need to copy the list.
this.unallocatedIDs.addAll(unallocatedIDs);
}
/**
* Allocates a source ID for a boat.
* @return The allocated source ID.
* @throws SourceIDAllocationException Thrown if we cannot allocate any more source IDs.
*/
public synchronized int allocateSourceID() throws SourceIDAllocationException {
if (!unallocatedIDs.isEmpty()) {
int sourceID = unallocatedIDs.remove(0);
allocatedIDs.add(sourceID);
return sourceID;
} else {
throw new SourceIDAllocationException("Could not allocate a source ID.");
}
}
/**
* Returns a source ID to the source ID allocator, so that it can be reused.
* @param sourceID Source ID to return.
*/
public void returnSourceID(Integer sourceID) {
//We remove an Integer, not an int, so that we remove by value not by index.
allocatedIDs.remove(sourceID);
unallocatedIDs.add(sourceID);
}
}

@ -0,0 +1,25 @@
package mock.model.commandFactory;
import mock.model.MockRace;
import java.util.Stack;
/**
* Wraps multiple commands into a composite to execute queued commands during a frame.
*/
public class CompositeCommand implements Command {
private Stack<Command> commands;
public CompositeCommand() {
this.commands = new Stack<>();
}
public void addCommand(Command command) {
commands.push(command);
}
@Override
public void execute(MockRace race) {
while(!commands.isEmpty()) commands.pop().execute(race);
}
}

@ -0,0 +1,9 @@
package network.MessageControllers;
public class MessageController {
}

@ -1,6 +1,7 @@
package network.MessageEncoders; package network.MessageEncoders;
import network.BinaryMessageEncoder;
import network.Exceptions.InvalidMessageException; import network.Exceptions.InvalidMessageException;
import network.Exceptions.InvalidMessageTypeException; import network.Exceptions.InvalidMessageTypeException;
import network.Messages.*; import network.Messages.*;
@ -104,7 +105,7 @@ public class RaceVisionByteEncoder {
/** /**
* Encodes a given message. * Encodes a given message, to be placed inside a binary message (see {@link BinaryMessageEncoder}).
* @param message Message to encode. * @param message Message to encode.
* @return Encoded message. * @return Encoded message.
* @throws InvalidMessageException If the message cannot be encoded. * @throws InvalidMessageException If the message cannot be encoded.
@ -126,4 +127,29 @@ public class RaceVisionByteEncoder {
} }
/**
* Encodes a given messages, using a given ackNumber, and returns a binary message ready to be sent over-the-wire.
* @param message The message to send.
* @param ackNumber The ackNumber of the message.
* @return A binary message ready to be transmitted.
* @throws InvalidMessageException Thrown if the message cannot be encoded.
*/
public static byte[] encodeBinaryMessage(AC35Data message, int ackNumber) throws InvalidMessageException {
//Encodes the message.
byte[] encodedMessage = RaceVisionByteEncoder.encode(message);
//Encodes the full message with header.
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(
message.getType(),
System.currentTimeMillis(),
ackNumber,
(short) encodedMessage.length,
encodedMessage );
return binaryMessageEncoder.getFullMessage();
}
} }

@ -0,0 +1,11 @@
package network.MessageRouters;
/**
* This class routes {@link network.Messages.AC35Data} messages to an appropriate message controller.
*/
public class MessageRouter {
}

@ -13,6 +13,12 @@ public class BoatAction extends AC35Data {
*/ */
private BoatActionEnum boatAction; private BoatActionEnum boatAction;
/**
* The source ID of the boat this action relates to.
*/
private int sourceID = 0;
/** /**
* Constructs a BoatActon message with a given action. * Constructs a BoatActon message with a given action.
* @param boatAction Action to use. * @param boatAction Action to use.
@ -30,4 +36,19 @@ public class BoatAction extends AC35Data {
return boatAction; return boatAction;
} }
/**
* Returns the boat source ID for this message.
* @return The source ID for this message.
*/
public int getSourceID() {
return sourceID;
}
/**
* Sets the boat source ID for this message.
* @param sourceID The source for this message.
*/
public void setSourceID(int sourceID) {
this.sourceID = sourceID;
}
} }

@ -3,9 +3,7 @@ package network.Messages;
import network.Messages.Enums.XMLMessageType; import network.Messages.Enums.XMLMessageType;
import shared.dataInput.RaceDataSource; import shared.dataInput.RaceDataSource;
import java.util.HashMap; import java.util.*;
import java.util.Map;
import java.util.Observable;
/** /**
* This class contains a set of the latest messages received (e.g., the latest RaceStatus, the latest BoatLocation for each boat, etc...). * This class contains a set of the latest messages received (e.g., the latest RaceStatus, the latest BoatLocation for each boat, etc...).
@ -44,6 +42,12 @@ public class LatestMessages extends Observable {
private CourseWinds courseWinds; private CourseWinds courseWinds;
/**
* A list of messages containing a snapshot of the race.
*/
private List<AC35Data> snapshot = new ArrayList<>();
/** /**
* The latest race data XML message. * The latest race data XML message.
*/ */
@ -69,6 +73,22 @@ public class LatestMessages extends Observable {
} }
/**
* Returns a copy of the race snapshot.
* @return Copy of the race snapshot.
*/
public List<AC35Data> getSnapshot() {
return new ArrayList<>(snapshot);
}
/**
* Sets the snapshot of the race.
* @param snapshot New snapshot of race.
*/
public void setSnapshot(List<AC35Data> snapshot) {
this.snapshot = snapshot;
}
/** /**

@ -0,0 +1,41 @@
package network.Messages;
import java.util.ArrayList;
import java.util.List;
/**
* Represents a snapshot of the race's state.
* Contains a list of {@link AC35Data} messages.
* Send a copy of each message to a connected client.
*/
public class RaceSnapshot {
/**
* The contents of the snapshot.
*/
private List<AC35Data> snapshot;
/**
* Constructs a snapshot using a given list of messages.
* @param snapshot Messages to use as snapshot.
*/
public RaceSnapshot(List<AC35Data> snapshot) {
this.snapshot = snapshot;
}
/**
* Gets the contents of the snapshot.
* This is a shallow copy.
* @return Contents of the snapshot.
*/
public List<AC35Data> getSnapshot() {
List<AC35Data> copy = new ArrayList<>(snapshot);
return copy;
}
}

@ -0,0 +1,156 @@
package network.StreamRelated;
import network.BinaryMessageDecoder;
import network.Exceptions.InvalidMessageException;
import network.MessageEncoders.RaceVisionByteEncoder;
import network.Messages.AC35Data;
import shared.model.RunnableWithFramePeriod;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import static network.Utils.ByteConverter.bytesToShort;
/**
* This class is responsible for converting data from an input stream into a queue of {@link AC35Data} messages.
*/
public class MessageDeserialiser implements RunnableWithFramePeriod {
/**
* The stream we're reading from.
*/
private DataInputStream inputStream;
/**
* The messages we've read.
*/
private BlockingQueue<AC35Data> messagesRead;
/**
* Ack numbers used in messages.
*/
private int ackNumber = 1;
/**
* Constructs a new MessageSerialiser to write a queue of messages to a given stream.
* @param inputStream The stream to write to.
* @param messagesRead The messages to send.
*/
public MessageDeserialiser(InputStream inputStream, BlockingQueue<AC35Data> messagesRead) {
this.inputStream = new DataInputStream(inputStream);
this.messagesRead = messagesRead;
}
/**
* Increments the ackNumber value, and returns it.
* @return Incremented ackNumber.
*/
private int getNextAckNumber(){
this.ackNumber++;
return this.ackNumber;
}
/**
* Reads and returns the next message as an array of bytes from the input stream. Use getNextMessage() to get the actual message object instead.
* @return Encoded binary message bytes.
* @throws IOException Thrown when an error occurs while reading from the input stream.
*/
private byte[] getNextMessageBytes() throws IOException {
inputStream.mark(0);
short CRCLength = 4;
short headerLength = 15;
//Read the header of the next message.
byte[] headerBytes = new byte[headerLength];
inputStream.readFully(headerBytes);
//Read the message body length.
byte[] messageBodyLengthBytes = Arrays.copyOfRange(headerBytes, headerLength - 2, headerLength);
short messageBodyLength = bytesToShort(messageBodyLengthBytes);
//Read the message body.
byte[] messageBodyBytes = new byte[messageBodyLength];
inputStream.readFully(messageBodyBytes);
//Read the message CRC.
byte[] messageCRCBytes = new byte[CRCLength];
inputStream.readFully(messageCRCBytes);
//Put the head + body + crc into one large array.
ByteBuffer messageBytes = ByteBuffer.allocate(headerBytes.length + messageBodyBytes.length + messageCRCBytes.length);
messageBytes.put(headerBytes);
messageBytes.put(messageBodyBytes);
messageBytes.put(messageCRCBytes);
return messageBytes.array();
}
/**
* Reads and returns the next message object from the input stream.
* @return The message object.
* @throws IOException Thrown when an error occurs while reading from the input stream.
* @throws InvalidMessageException Thrown when the message is invalid in some way.
*/
private AC35Data getNextMessage() throws IOException, InvalidMessageException
{
//Get the next message from the socket as a block of bytes.
byte[] messageBytes = this.getNextMessageBytes();
//Decode the binary message into an appropriate message object.
BinaryMessageDecoder decoder = new BinaryMessageDecoder(messageBytes);
return decoder.decode();
}
@Override
public void run() {
long previousFrameTime = System.currentTimeMillis();
while (!Thread.interrupted()) {
long currentFrameTime = System.currentTimeMillis();
waitForFramePeriod(previousFrameTime, currentFrameTime, 16);
previousFrameTime = currentFrameTime;
//Reads the next message.
try {
AC35Data message = this.getNextMessage();
messagesRead.add(message);
}
catch (InvalidMessageException | IOException e) {
Logger.getGlobal().log(Level.WARNING, "Unable to read message.", e);
try {
inputStream.reset();
} catch (IOException e1) {
Logger.getGlobal().log(Level.WARNING, "Unable to reset inputStream.", e);
}
}
}
}
}

@ -0,0 +1,116 @@
package network.StreamRelated;
import network.Exceptions.InvalidMessageException;
import network.MessageEncoders.RaceVisionByteEncoder;
import network.Messages.AC35Data;
import shared.model.RunnableWithFramePeriod;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This class is responsible for writing a queue of {@link network.Messages.AC35Data} messages to an output stream.
*/
public class MessageSerialiser implements RunnableWithFramePeriod {
/**
* The stream we're writing to.
*/
private DataOutputStream outputStream;
/**
* The messages we're writing to the stream.
*/
private BlockingQueue<AC35Data> messagesToSend;
/**
* Ack numbers used in messages.
*/
private int ackNumber = 1;
/**
* Determines whether or not this runnable is currently running.
*/
private boolean isRunning;
/**
* Constructs a new MessageSerialiser to write a queue of messages to a given stream.
* @param outputStream The stream to write to.
* @param messagesToSend The messages to send.
*/
public MessageSerialiser(OutputStream outputStream, BlockingQueue<AC35Data> messagesToSend) {
this.outputStream = new DataOutputStream(outputStream);
this.messagesToSend = messagesToSend;
}
/**
* Increments the ackNumber value, and returns it.
* @return Incremented ackNumber.
*/
private int getNextAckNumber(){
this.ackNumber++;
return this.ackNumber;
}
/**
* Determines whether or not this runnable is running.
* @return True means that it is still running, false means that it has stopped.
*/
public boolean isRunning() {
return isRunning;
}
@Override
public void run() {
long previousFrameTime = System.currentTimeMillis();
isRunning = true;
while (isRunning) {
long currentFrameTime = System.currentTimeMillis();
waitForFramePeriod(previousFrameTime, currentFrameTime, 16);
previousFrameTime = currentFrameTime;
//Send the messages.
List<AC35Data> messages = new ArrayList<>();
messagesToSend.drainTo(messages);
for (AC35Data message : messages) {
try {
byte[] messageBytes = RaceVisionByteEncoder.encodeBinaryMessage(message, getNextAckNumber());
outputStream.write(messageBytes);
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode message: " + message, e);
} catch (IOException e) {
Logger.getGlobal().log(Level.WARNING, "Could not write message to outputStream: " + outputStream, e);
isRunning = false;
}
}
}
}
}

@ -0,0 +1,15 @@
package shared.exceptions;
/**
* An exception thrown when a specific boat cannot be found.
*/
public class BoatNotFoundException extends Exception {
public BoatNotFoundException(String message) {
super(message);
}
public BoatNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}

@ -0,0 +1,24 @@
package shared.exceptions;
/**
* An exception thrown when we the client-server handshake fails.
*/
public class HandshakeException extends Exception {
/**
* Constructs the exception with a given message.
* @param message Message to store.
*/
public HandshakeException(String message) {
super(message);
}
/**
* Constructs the exception with a given message and cause.
* @param message Message to store.
* @param cause Cause to store.
*/
public HandshakeException(String message, Throwable cause) {
super(message, cause);
}
}

@ -0,0 +1,64 @@
package shared.model;
import network.Exceptions.InvalidMessageException;
import network.MessageEncoders.RaceVisionByteEncoder;
import network.Messages.AC35Data;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This interface is a {@link Runnable} interface, with the ability to sleep until a given time period has elapsed.
*/
public interface RunnableWithFramePeriod extends Runnable {
/**
* Waits for enough time for the period of this frame to be greater than minimumFramePeriod.
* @param previousFrameTime The timestamp of the previous frame.
* @param currentFrameTime The timestamp of the current frame.
* @param minimumFramePeriod The minimum period the frame must be.
*/
default void waitForFramePeriod(long previousFrameTime, long currentFrameTime, long minimumFramePeriod) {
//This is the time elapsed, in milliseconds, since the last server "frame".
long framePeriod = currentFrameTime - previousFrameTime;
//We only attempt to send packets every X milliseconds.
if (framePeriod >= minimumFramePeriod) {
return;
} else {
//Wait until the frame period will be large enough.
long timeToWait = minimumFramePeriod - framePeriod;
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
//If we get interrupted, exit the function.
Logger.getGlobal().log(Level.SEVERE, "RunnableWithFramePeriod.waitForFramePeriod().sleep(framePeriod) was interrupted on thread: " + Thread.currentThread(), e);
//Re-set the interrupt flag.
Thread.currentThread().interrupt();
return;
}
}
}
}

@ -144,32 +144,5 @@ public class ConnectionController extends Controller {
} }
} }
/**
* Sets up a new host
*/
public void addLocal() {
try {
//We don't want to host more than one game.
if (!currentlyHostingGame) {
Event game = Event.getEvent();
urlField.textProperty().set(game.getAddress());
portField.textProperty().set(Integer.toString(game.getPort()));
game.start();
addConnection();
currentlyHostingGame = true;
}
} catch (InvalidRaceDataException e) {
e.printStackTrace();
} catch (XMLReaderException e) {
e.printStackTrace();
} catch (InvalidBoatDataException e) {
e.printStackTrace();
} catch (InvalidRegattaDataException e) {
e.printStackTrace();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
} }

@ -6,6 +6,7 @@ import javafx.scene.control.*;
import javafx.scene.layout.AnchorPane; import javafx.scene.layout.AnchorPane;
import javafx.stage.Stage; import javafx.stage.Stage;
import mock.app.Event; import mock.app.Event;
import mock.exceptions.EventConstructionException;
import shared.exceptions.InvalidBoatDataException; import shared.exceptions.InvalidBoatDataException;
import shared.exceptions.InvalidRaceDataException; import shared.exceptions.InvalidRaceDataException;
import shared.exceptions.InvalidRegattaDataException; import shared.exceptions.InvalidRegattaDataException;
@ -17,6 +18,8 @@ import java.net.Socket;
import java.net.URL; import java.net.URL;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ResourceBundle; import java.util.ResourceBundle;
import java.util.logging.Level;
import java.util.logging.Logger;
/** /**
* Controller for Hosting a game. * Controller for Hosting a game.
@ -44,17 +47,12 @@ public class HostController extends Controller {
*/ */
public void hostGamePressed() throws IOException{ public void hostGamePressed() throws IOException{
try { try {
Event game = Event.getEvent(); Event game = new Event();
game.start(); game.start();
connectSocket("localhost", 4942); connectSocket("localhost", 4942);
} catch (InvalidRaceDataException e) { } catch (EventConstructionException e) {
e.printStackTrace(); Logger.getGlobal().log(Level.SEVERE, "Could not create Event.", e);
} catch (XMLReaderException e) { throw new RuntimeException(e);
e.printStackTrace();
} catch (InvalidBoatDataException e) {
e.printStackTrace();
} catch (InvalidRegattaDataException e) {
e.printStackTrace();
} }
} }

@ -26,6 +26,8 @@ import visualiser.model.*;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.util.ResourceBundle; import java.util.ResourceBundle;
import java.util.logging.Level;
import java.util.logging.Logger;
/** /**
* Controller used to display a running race. * Controller used to display a running race.
@ -115,8 +117,9 @@ public class RaceController extends Controller {
controllerClient.sendKey(controlKey); controllerClient.sendKey(controlKey);
controlKey.onAction(); // Change key state if applicable controlKey.onAction(); // Change key state if applicable
event.consume(); event.consume();
} catch (IOException e) { } catch (InterruptedException e) {
e.printStackTrace(); Thread.currentThread().interrupt();
Logger.getGlobal().log(Level.WARNING, "RaceController was interrupted on thread: " + Thread.currentThread() + "while sending: " + controlKey, e);
} }
} }
}); });

@ -20,6 +20,7 @@ import shared.exceptions.InvalidRegattaDataException;
import shared.exceptions.XMLReaderException; import shared.exceptions.XMLReaderException;
import visualiser.app.VisualiserInput; import visualiser.app.VisualiserInput;
import visualiser.gameController.ControllerClient; import visualiser.gameController.ControllerClient;
import visualiser.model.ServerConnection;
import visualiser.model.VisualiserBoat; import visualiser.model.VisualiserBoat;
import visualiser.model.VisualiserRace; import visualiser.model.VisualiserRace;
@ -27,6 +28,8 @@ import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.net.URL; import java.net.URL;
import java.util.*; import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
/** /**
* Controller to for waiting for the race to start. * Controller to for waiting for the race to start.
@ -66,18 +69,18 @@ public class StartController extends Controller implements Observer {
@FXML private Label raceStatusLabel; @FXML private Label raceStatusLabel;
/** /**
* The object used to read packets from the connected server. * Our connection to the server.
*/ */
private VisualiserInput visualiserInput; private ServerConnection serverConnection;
/** /**
* The race object which describes the currently occurring race. * The race object which describes the currently occurring race.
*/ */
private VisualiserRace visualiserRace; private VisualiserRace visualiserRace;
private ControllerClient controllerClient;
/** /**
* An array of colors used to assign colors to each boat - passed in to the VisualiserRace constructor. * An array of colors used to assign colors to each boat - passed in to the VisualiserRace constructor.
@ -309,17 +312,17 @@ public class StartController extends Controller implements Observer {
public void enterLobby(Socket socket) { public void enterLobby(Socket socket) {
startWrapper.setVisible(true); startWrapper.setVisible(true);
try { try {
//Begin reading packets from the socket/server.
this.visualiserInput = new VisualiserInput(socket); LatestMessages latestMessages = new LatestMessages();
//Send controller input to server this.serverConnection = new ServerConnection(socket, latestMessages);
this.controllerClient = new ControllerClient(socket);
//Store a reference to latestMessages so that we can observe it. //Store a reference to latestMessages so that we can observe it.
LatestMessages latestMessages = this.visualiserInput.getLatestMessages();
latestMessages.addObserver(this); latestMessages.addObserver(this);
new Thread(this.visualiserInput).start(); new Thread(this.serverConnection).start();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); Logger.getGlobal().log(Level.WARNING, "Could not connection to server.", e);
} }
} }

@ -2,12 +2,14 @@ package visualiser.app;
import network.BinaryMessageDecoder; import network.BinaryMessageDecoder;
import network.Exceptions.InvalidMessageException; import network.Exceptions.InvalidMessageException;
import network.Messages.*; import network.Messages.*;
import shared.model.RunnableWithFramePeriod;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import static network.Utils.ByteConverter.bytesToShort; import static network.Utils.ByteConverter.bytesToShort;
@ -15,7 +17,7 @@ import static network.Utils.ByteConverter.bytesToShort;
* TCP client which receives packets/messages from a race data source * TCP client which receives packets/messages from a race data source
* (e.g., mock source, official source), and exposes them to any observers. * (e.g., mock source, official source), and exposes them to any observers.
*/ */
public class VisualiserInput implements Runnable { public class VisualiserInput implements RunnableWithFramePeriod {
/** /**
* Timestamp of the last heartbeat. * Timestamp of the last heartbeat.
@ -27,40 +29,28 @@ public class VisualiserInput implements Runnable {
private long lastHeartbeatSequenceNum = -1; private long lastHeartbeatSequenceNum = -1;
/**
* The socket that we have connected to.
*/
private Socket connectionSocket;
/** /**
* InputStream (from the socket). * Incoming messages from server.
*/ */
private DataInputStream inStream; private BlockingQueue<AC35Data> incomingMessages;
/** /**
* An object containing the set of latest messages to write to. * An object containing the set of latest messages to write to.
* Every server frame, VisualiserInput reads messages from its inputStream, and write them to this. * Every server frame, VisualiserInput reads messages from its incomingMessages, and write them to this.
*/ */
private LatestMessages latestMessages; private LatestMessages latestMessages;
/** /**
* Ctor. * Constructs a visualiserInput to convert an incoming stream of messages into LatestMessages.
* @param socket Socket from which we will receive race data. * @param latestMessages Object to place messages in.
* @throws IOException If there is something wrong with the socket's input stream. * @param incomingMessages The incoming queue of messages.
*/ */
public VisualiserInput(Socket socket) throws IOException { public VisualiserInput(LatestMessages latestMessages, BlockingQueue<AC35Data> incomingMessages) {
this.latestMessages = latestMessages;
this.connectionSocket = socket; this.incomingMessages = incomingMessages;
//We wrap a DataInputStream around the socket's InputStream because it has the stream.readFully(buffer) function, which is a blocking read until the buffer has been filled.
this.inStream = new DataInputStream(connectionSocket.getInputStream());
this.latestMessages = new LatestMessages();
this.lastHeartbeatTime = System.currentTimeMillis(); this.lastHeartbeatTime = System.currentTimeMillis();
} }
@ -85,279 +75,21 @@ public class VisualiserInput implements Runnable {
/**
* Reads and returns the next message as an array of bytes from the socket. Use getNextMessage() to get the actual message object instead.
* @return Encoded binary message bytes.
* @throws IOException Thrown when an error occurs while reading from the socket.
*/
private byte[] getNextMessageBytes() throws IOException {
inStream.mark(0);
short CRCLength = 4;
short headerLength = 15;
//Read the header of the next message.
byte[] headerBytes = new byte[headerLength];
inStream.readFully(headerBytes);
//Read the message body length.
byte[] messageBodyLengthBytes = Arrays.copyOfRange(headerBytes, headerLength - 2, headerLength);
short messageBodyLength = bytesToShort(messageBodyLengthBytes);
//Read the message body.
byte[] messageBodyBytes = new byte[messageBodyLength];
inStream.readFully(messageBodyBytes);
//Read the message CRC.
byte[] messageCRCBytes = new byte[CRCLength];
inStream.readFully(messageCRCBytes);
//Put the head + body + crc into one large array.
ByteBuffer messageBytes = ByteBuffer.allocate(headerBytes.length + messageBodyBytes.length + messageCRCBytes.length);
messageBytes.put(headerBytes);
messageBytes.put(messageBodyBytes);
messageBytes.put(messageCRCBytes);
return messageBytes.array();
}
/**
* Reads and returns the next message object from the socket.
* @return The message object. Use instanceof for concrete type.
* @throws IOException Thrown when an error occurs while reading from the socket.
* @throws InvalidMessageException Thrown when the message is invalid in some way.
*/
private AC35Data getNextMessage() throws IOException, InvalidMessageException
{
//Get the next message from the socket as a block of bytes.
byte[] messageBytes = this.getNextMessageBytes();
//Decode the binary message into an appropriate message object.
BinaryMessageDecoder decoder = new BinaryMessageDecoder(messageBytes);
return decoder.decode();
}
/**
* Main loop which reads messages from the socket, and exposes them.
*/
public void run(){
boolean receiverLoop = true;
//receiver loop that gets the input
while (receiverLoop) {
//If no heartbeat has been received in more the heartbeat period
//then the connection will need to be restarted.
//System.out.println("time since last heartbeat: " + timeSinceHeartbeat());//TEMP REMOVE
long heartBeatPeriod = 10 * 1000;
if (timeSinceHeartbeat() > heartBeatPeriod) {
System.out.println("Connection has stopped, trying to reconnect.");
//Attempt to reconnect the socket.
try {//This attempt doesn't really work. Under what circumstances would
this.connectionSocket = new Socket(this.connectionSocket.getInetAddress(), this.connectionSocket.getPort());
//this.connectionSocket.connect(this.connectionSocket.getRemoteSocketAddress());
//Reset the heartbeat timer.
this.lastHeartbeatTime = System.currentTimeMillis();
}
catch (IOException e) {
System.err.println("Unable to reconnect.");
//Wait 500ms. Ugly hack, should refactor.
long waitPeriod = 500;
long waitTimeStart = System.currentTimeMillis() + waitPeriod;
while (System.currentTimeMillis() < waitTimeStart){
//Nothing. Busyloop.
}
//Swallow the exception.
continue;
}
}
//Reads the next message.
AC35Data message;
try {
message = this.getNextMessage();
}
catch (InvalidMessageException | IOException e) {
//Prints exception to stderr, and iterate loop (that is, read the next message).
System.err.println("Unable to read message: " + e.getMessage());
try {
inStream.reset();
} catch (IOException e1) {
e1.printStackTrace();
}
//Continue to the next loop iteration/message.
continue;
}
//Checks which message is being received and does what is needed for that message.
switch (message.getType()) {
//Heartbeat.
case HEARTBEAT: {
HeartBeat heartBeat = (HeartBeat) message;
//Check that the heartbeat number is greater than the previous value, and then set the last heartbeat time.
if (heartBeat.getSequenceNumber() > this.lastHeartbeatSequenceNum) {
lastHeartbeatTime = System.currentTimeMillis();
lastHeartbeatSequenceNum = heartBeat.getSequenceNumber();
//System.out.println("HeartBeat Message! " + lastHeartbeatSequenceNum);
}
break;
}
//RaceStatus.
case RACESTATUS: {
RaceStatus raceStatus = (RaceStatus) message;
//System.out.println("Race Status Message");
this.latestMessages.setRaceStatus(raceStatus);
for (BoatStatus boatStatus : raceStatus.getBoatStatuses()) {
this.latestMessages.setBoatStatus(boatStatus);
}
break;
}
//DisplayTextMessage.
case DISPLAYTEXTMESSAGE: {
//System.out.println("Display Text Message");
//No decoder for this.
break;
}
//XMLMessage.
case XMLMESSAGE: {
XMLMessage xmlMessage = (XMLMessage) message;
//System.out.println("XML Message!");
this.latestMessages.setXMLMessage(xmlMessage);
break;
}
//RaceStartStatus.
case RACESTARTSTATUS: {
//System.out.println("Race Start Status Message");
break;
}
//YachtEventCode.
case YACHTEVENTCODE: {
//YachtEventCode yachtEventCode = (YachtEventCode) message;
//System.out.println("Yacht Event Code!");
//No decoder for this.
break;
}
//YachtActionCode.
case YACHTACTIONCODE: {
//YachtActionCode yachtActionCode = (YachtActionCode) message;
//System.out.println("Yacht Action Code!");
// No decoder for this.
break;
}
//ChatterText.
case CHATTERTEXT: {
//ChatterText chatterText = (ChatterText) message;
//System.out.println("Chatter Text Message!");
//No decoder for this.
break;
}
//BoatLocation.
case BOATLOCATION: {
BoatLocation boatLocation = (BoatLocation) message;
//System.out.println("Boat Location!");
BoatLocation existingBoatLocation = this.latestMessages.getBoatLocationMap().get(boatLocation.getSourceID());
if (existingBoatLocation != null) {
//If our boatlocation map already contains a boat location message for this boat, check that the new message is actually for a later timestamp (i.e., newer).
if (boatLocation.getTime() > existingBoatLocation.getTime()) {
//If it is, replace the old message.
this.latestMessages.setBoatLocation(boatLocation);
}
} else {
//If the map _doesn't_ already contain a message for this boat, insert the message.
this.latestMessages.setBoatLocation(boatLocation);
}
break;
}
//MarkRounding.
case MARKROUNDING: {
MarkRounding markRounding = (MarkRounding) message;
//System.out.println("Mark Rounding Message!");
MarkRounding existingMarkRounding = this.latestMessages.getMarkRoundingMap().get(markRounding.getSourceID());
if (existingMarkRounding != null) {
//If our markRoundingMap already contains a mark rounding message for this boat, check that the new message is actually for a later timestamp (i.e., newer).
if (markRounding.getTime() > existingMarkRounding.getTime()) {
//If it is, replace the old message.
this.latestMessages.setMarkRounding(markRounding);
}
} else {
//If the map _doesn't_ already contain a message for this boat, insert the message.
this.latestMessages.setMarkRounding(markRounding);
}
break;
}
//CourseWinds.
case COURSEWIND: {
//System.out.println("Course Wind Message!");
CourseWinds courseWinds = (CourseWinds) message;
this.latestMessages.setCourseWinds(courseWinds);
break; @Override
} public void run() {
//AverageWind. //Handshake.
case AVGWIND: {
//System.out.println("Average Wind Message!"); //Main loop.
AverageWind averageWind = (AverageWind) message; // take message
// create command
// place in command queue
this.latestMessages.setAverageWind(averageWind);
break;
}
//Unrecognised message.
default: {
System.out.println("Broken Message!");
break;
}
}
}
} }
} }

@ -3,6 +3,7 @@ package visualiser.gameController;
import network.BinaryMessageEncoder; import network.BinaryMessageEncoder;
import network.Exceptions.InvalidMessageException; import network.Exceptions.InvalidMessageException;
import network.MessageEncoders.RaceVisionByteEncoder; import network.MessageEncoders.RaceVisionByteEncoder;
import network.Messages.AC35Data;
import network.Messages.BoatAction; import network.Messages.BoatAction;
import network.Messages.Enums.BoatActionEnum; import network.Messages.Enums.BoatActionEnum;
import network.Messages.Enums.MessageType; import network.Messages.Enums.MessageType;
@ -13,6 +14,7 @@ import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -20,28 +22,18 @@ import java.util.logging.Logger;
* Basic service for sending key presses to game server * Basic service for sending key presses to game server
*/ */
public class ControllerClient { public class ControllerClient {
/**
* Socket to server
*/
Socket socket;
/** /**
* Output stream wrapper for socket to server * Queue of messages to be sent to server.
*/ */
DataOutputStream outputStream; private BlockingQueue<AC35Data> outgoingMessages;
/** /**
* Initialise controller client with live socket. * Initialise controller client with live socket.
* @param socket to server * @param outgoingMessages Queue to place messages on to send to server.
*/ */
public ControllerClient(Socket socket) { public ControllerClient(BlockingQueue<AC35Data> outgoingMessages) {
this.socket = socket; this.outgoingMessages = outgoingMessages;
try {
this.outputStream = new DataOutputStream(socket.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
}
} }
/** /**
@ -49,27 +41,13 @@ public class ControllerClient {
* @param key to send * @param key to send
* @throws IOException if socket write fails * @throws IOException if socket write fails
*/ */
public void sendKey(ControlKey key) throws IOException { public void sendKey(ControlKey key) throws InterruptedException {
BoatActionEnum protocolCode = key.getProtocolCode(); BoatActionEnum protocolCode = key.getProtocolCode();
if(protocolCode != BoatActionEnum.NOT_A_STATUS) { if(protocolCode != BoatActionEnum.NOT_A_STATUS) {
BoatAction boatAction = new BoatAction(protocolCode); BoatAction boatAction = new BoatAction(protocolCode);
//Encode BoatAction. outgoingMessages.put(boatAction);
try {
byte[] encodedBoatAction = RaceVisionByteEncoder.encode(boatAction);
BinaryMessageEncoder binaryMessage = new BinaryMessageEncoder(MessageType.BOATACTION, System.currentTimeMillis(), 0,
(short) encodedBoatAction.length, encodedBoatAction);
System.out.println("Sending out key: " + protocolCode);
outputStream.write(binaryMessage.getFullMessage());
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode BoatAction: " + boatAction, e);
}
} }
} }

@ -1,16 +1,19 @@
package visualiser.gameController; package visualiser.gameController;
import mock.model.commandFactory.Command;
import mock.model.commandFactory.CommandFactory;
import mock.model.commandFactory.CompositeCommand;
import network.BinaryMessageDecoder; import network.BinaryMessageDecoder;
import network.Exceptions.InvalidMessageException; import network.Exceptions.InvalidMessageException;
import network.MessageDecoders.BoatActionDecoder; import network.MessageDecoders.BoatActionDecoder;
import network.Messages.AC35Data;
import network.Messages.BoatAction; import network.Messages.BoatAction;
import network.Messages.Enums.BoatActionEnum; import network.Messages.Enums.MessageType;
import visualiser.gameController.Keys.ControlKey;
import visualiser.gameController.Keys.KeyFactory;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -18,57 +21,69 @@ import java.util.logging.Logger;
* Service for dispatching key press data to race from client * Service for dispatching key press data to race from client
*/ */
public class ControllerServer implements Runnable { public class ControllerServer implements Runnable {
/** /**
* Socket to client * Queue of incoming messages from client.
*/ */
private Socket socket; private BlockingQueue<AC35Data> inputQueue;
/** /**
* Wrapper for input from client * Collection of commands from client for race to execute.
*/ */
private DataInputStream inputStream; private CompositeCommand compositeCommand;
/** /**
* Initialise server-side controller with live client socket * This is the source ID associated with the client.
* @param socket to client
*/ */
public ControllerServer(Socket socket) { private int clientSourceID;
this.socket = socket;
try {
this.inputStream = new DataInputStream(this.socket.getInputStream());
} catch (IOException e) { /**
e.printStackTrace(); * Initialise server-side controller with live client socket.
} * @param compositeCommand Commands for the race to execute.
* @param inputQueue The queue of messages to read from.
* @param clientSourceID The source ID of the client's boat.
*/
public ControllerServer(CompositeCommand compositeCommand, BlockingQueue<AC35Data> inputQueue, int clientSourceID) {
this.compositeCommand = compositeCommand;
this.inputQueue = inputQueue;
this.clientSourceID = clientSourceID;
} }
/** /**
* Wait for controller key input from client and loop. * Wait for controller key input from client and loop.
*/ */
@Override @Override
public void run() { public void run() {
while(true) { while(!Thread.interrupted()) {
byte[] message = new byte[20];
try {
if (inputStream.available() > 0) {
inputStream.read(message); try {
BinaryMessageDecoder encodedMessage = new BinaryMessageDecoder(message); AC35Data message = inputQueue.take();
BoatActionDecoder boatActionDecoder = new BoatActionDecoder();
try { if (message.getType() == MessageType.BOATACTION) {
boatActionDecoder.decode(encodedMessage.getMessageBody());
BoatAction boatAction = boatActionDecoder.getMessage();
System.out.println("Received key: " + boatAction.getBoatAction());
} catch (InvalidMessageException e) { BoatAction boatAction = (BoatAction) message;
Logger.getGlobal().log(Level.WARNING, "Could not decode BoatAction message.", e);
}
boatAction.setSourceID(clientSourceID);
Command command = CommandFactory.createCommand(boatAction);
compositeCommand.addCommand(command);
} }
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
Logger.getGlobal().log(Level.WARNING, "ControllerServer Interrupted while waiting for message on incoming message queue.", e);
Thread.currentThread().interrupt();
return;
} }
} }
} }
} }

@ -27,8 +27,8 @@ public class KeyFactory {
keyState.put("SPACE", new VMGKey("VMG")); keyState.put("SPACE", new VMGKey("VMG"));
keyState.put("SHIFT", new SailsToggleKey("Toggle Sails")); keyState.put("SHIFT", new SailsToggleKey("Toggle Sails"));
keyState.put("ENTER", new TackGybeKey("Tack/Gybe")); keyState.put("ENTER", new TackGybeKey("Tack/Gybe"));
keyState.put("PAGE_UP", new UpWindKey("Upwind")); keyState.put("UP", new UpWindKey("Upwind"));
keyState.put("PAGE_DOWN", new DownWindKey("Downwind")); keyState.put("DOWN", new DownWindKey("Downwind"));
} }
/** /**

@ -0,0 +1,228 @@
package visualiser.model;
import mock.app.MockOutput;
import mock.enums.ConnectionStateEnum;
import mock.exceptions.SourceIDAllocationException;
import mock.model.HeartBeatService;
import mock.model.SourceIdAllocator;
import mock.model.commandFactory.CompositeCommand;
import network.Messages.AC35Data;
import network.Messages.Enums.JoinAcceptanceEnum;
import network.Messages.Enums.MessageType;
import network.Messages.Enums.RequestToJoinEnum;
import network.Messages.JoinAcceptance;
import network.Messages.LatestMessages;
import network.Messages.RequestToJoin;
import network.StreamRelated.MessageDeserialiser;
import network.StreamRelated.MessageSerialiser;
import shared.exceptions.HandshakeException;
import visualiser.app.VisualiserInput;
import visualiser.gameController.ControllerClient;
import visualiser.gameController.ControllerServer;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This class handles the client-server connection handshake, and creation of VisualiserInput and ControllerClient.
*/
public class ServerConnection implements Runnable {
/**
* The socket for the connection to server.
*/
private Socket socket;
/**
* Latest snapshot of the race, received from the server.
*/
private LatestMessages latestMessages;
/**
* Used to convert incoming messages into a race snapshot.
*/
private VisualiserInput visualiserInput;
/**
* Used to send client input to server.
*/
private ControllerClient controllerClient;
/**
* Used to write messages to socket.
*/
private MessageSerialiser messageSerialiser;
/**
* Stores messages to write to socket.
*/
private BlockingQueue<AC35Data> outputQueue;
/**
* Used to read messages from socket.
*/
private MessageDeserialiser messageDeserialiser;
/**
* Stores messages read from socket.
*/
private BlockingQueue<AC35Data> inputQueue;
/**
* The state of the connection to the client.
*/
private ConnectionStateEnum connectionState = ConnectionStateEnum.UNKNOWN;
/**
* Creates a server connection, using a given socket.
* @param socket The socket which connects to the client.
* @param latestMessages Latest race snapshot to send to client.
* @throws IOException Thrown if there is a problem with the client socket.
*/
public ServerConnection(Socket socket, LatestMessages latestMessages) throws IOException {
this.socket = socket;
this.latestMessages = latestMessages;
this.outputQueue = new LinkedBlockingQueue<>();
this.inputQueue = new LinkedBlockingQueue<>();
this.messageSerialiser = new MessageSerialiser(socket.getOutputStream(), outputQueue);
this.messageDeserialiser = new MessageDeserialiser(socket.getInputStream(), inputQueue);
new Thread(messageSerialiser, "ServerConnection()->MessageSerialiser thread " + messageSerialiser).start();
new Thread(messageDeserialiser, "ServerConnection()->MessageDeserialiser thread " + messageDeserialiser).start();
}
@Override
public void run() {
try {
handshake();
} catch (HandshakeException e) {
Logger.getGlobal().log(Level.WARNING, "Server handshake failed.", e);
Thread.currentThread().interrupt();
return;
}
}
/**
* Initiates the handshake with the server.
* @throws HandshakeException Thrown if something goes wrong with the handshake.
*/
private void handshake() throws HandshakeException {
//This function is a bit messy, and could probably be refactored a bit.
connectionState = ConnectionStateEnum.WAITING_FOR_HANDSHAKE;
sendJoinAcceptanceMessage(RequestToJoinEnum.PARTICIPANT);
JoinAcceptance joinAcceptance = waitForJoinAcceptance();
int allocatedSourceID = 0;
//If we join successfully...
if (joinAcceptance.getAcceptanceType() == JoinAcceptanceEnum.JOIN_SUCCESSFUL) {
allocatedSourceID = joinAcceptance.getSourceID();
//TODO need to do something with the ID - maybe flag the correct visualiser boat as being the client's boat?
this.controllerClient = new ControllerClient(inputQueue);
//new Thread(controllerClient, "ServerConnection.run()->ControllerClient thread " + controllerClient).start();
}
this.visualiserInput = new VisualiserInput(latestMessages, outputQueue);
new Thread(visualiserInput, "ServerConnection.run()->VisualiserInput thread " + visualiserInput).start();
connectionState = ConnectionStateEnum.CONNECTED;
}
/**
* Waits until the server sends a {@link JoinAcceptance} message, and returns it.
* @return The {@link JoinAcceptance} message.
* @throws HandshakeException Thrown if we get interrupted while waiting.
*/
private JoinAcceptance waitForJoinAcceptance() throws HandshakeException {
try {
while (connectionState == ConnectionStateEnum.WAITING_FOR_HANDSHAKE) {
AC35Data message = inputQueue.take();
//We need to wait until they actually send a join request.
if (message.getType() == MessageType.JOIN_ACCEPTANCE) {
return (JoinAcceptance) message;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new HandshakeException("Handshake failed. Thread: " + Thread.currentThread() + " was interrupted while waiting on the incoming message queue.", e);
}
throw new HandshakeException("Handshake was cancelled. Connection state is now: " + connectionState);
}
/**
* Sends the server a {@link RequestToJoin} message.
* @param requestType The type of request to send
* @throws HandshakeException Thrown if the thread is interrupted while placing message on the outgoing message queue.
*/
private void sendJoinAcceptanceMessage(RequestToJoinEnum requestType) throws HandshakeException {
//Send them the source ID.
RequestToJoin requestToJoin = new RequestToJoin(requestType);
try {
outputQueue.put(requestToJoin);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new HandshakeException("Handshake failed. Thread: " + Thread.currentThread() + " interrupted while placing RequestToJoin message on outgoing message queue.", e);
}
}
/**
* Determines whether or not this connection is still alive.
* This is based off whether the {@link MessageSerialiser} is still alive.
* @return True if it is alive, false otherwise.
*/
public boolean isAlive() {
return messageSerialiser.isRunning();
}
}

@ -0,0 +1,126 @@
package mock.model;
import mock.exceptions.SourceIDAllocationException;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.*;
/**
* Tests if allocating source IDs works.
*/
public class SourceIdAllocatorTest {
/**
* This is the list of source IDs that we start with.
*/
private List<Integer> originalSourceIDs;
/**
* Used to allocate source IDs.
*/
private SourceIdAllocator sourceIdAllocator;
@Before
public void setUp() throws Exception {
originalSourceIDs = new ArrayList<>();
originalSourceIDs.add(120);
originalSourceIDs.add(121);
originalSourceIDs.add(122);
originalSourceIDs.add(123);
originalSourceIDs.add(124);
originalSourceIDs.add(125);
sourceIdAllocator = new SourceIdAllocator(originalSourceIDs);
}
/**
* Tests that allocation fails when we don't have any source IDs to allocate.
*/
@Test
public void emptyAllocationTest() {
SourceIdAllocator allocator = new SourceIdAllocator(new ArrayList<>());
try {
int sourceID = allocator.allocateSourceID();
fail("Exception should have been thrown, but wasn't.");
} catch (SourceIDAllocationException e) {
//We expect this exception to be thrown - success.
}
}
/**
* Tests that we can allocate a source ID.
* @throws Exception Thrown in case of error.
*/
@Test
public void allocationTest() throws Exception {
int sourceID = sourceIdAllocator.allocateSourceID();
}
/**
* Tests that we can allocate source IDs, but it will eventually be unable to allocate source IDs.
*/
@Test
public void allocationEventuallyFailsTest() {
while (true) {
try {
int sourceID = sourceIdAllocator.allocateSourceID();
} catch (SourceIDAllocationException e) {
//We expect to encounter this exception after enough allocations - success.
break;
}
}
}
/**
* Tests if we can allocate a source ID, return it, and reallocate it.
* @throws Exception Thrown in case of error.
*/
@Test
public void reallocationTest() throws Exception {
List<Integer> sourceIDList = new ArrayList<>();
sourceIDList.add(123);
SourceIdAllocator sourceIdAllocator = new SourceIdAllocator(sourceIDList);
//Allocate.
int sourceID = sourceIdAllocator.allocateSourceID();
//Return.
sourceIdAllocator.returnSourceID(sourceID);
//Reallocate.
int sourceID2 = sourceIdAllocator.allocateSourceID();
}
}

@ -0,0 +1,31 @@
package mock.model.commandFactory;
import mock.model.MockRace;
import network.Messages.Enums.BoatActionEnum;
import org.junit.Before;
import org.junit.Test;
import shared.model.Boat;
import shared.model.Race;
import visualiser.model.VisualiserRace;
import static org.testng.Assert.*;
/**
* Created by connortaylorbrown on 4/08/17.
*/
public class WindCommandTest {
private Race race;
private Boat boat;
private Command upwind;
private Command downwind;
@Before
public void setUp() {
boat = new Boat(0, "Bob", "NZ");
}
@Test
public void upwindCommandDecreasesAngle() {
}
}
Loading…
Cancel
Save