@ -1,77 +1,89 @@
package seng302 ;
import org.opengis.style.Mark ;
import org.xml.sax.SAXException ;
import seng302.Mock.* ;
import seng302.Networking.BinaryMessageDecoder ;
import seng302.Networking. MessageDecoders.* ;
import seng302.Networking. Util s.*;
import seng302.Networking. Exceptions.InvalidMessageException ;
import seng302.Networking. Message s.*;
import javax.xml.parsers.ParserConfigurationException ;
import java.io.* ;
import java.net.* ;
import java.nio.Buffer ;
import java.nio.ByteBuffer ;
import java.nio.ByteOrder ;
import java.text.ParseException ;
import java.util.Arrays ;
import java.util.HashMap ;
import java.util.Map ;
import java.util.concurrent. TimeUnit ;
import java.util.concurrent. ArrayBlockingQueue ;
import static seng302.Networking.Utils.ByteConverter.bytesToInt ;
import static seng302.Networking.Utils.ByteConverter.bytesToShort ;
import static seng302.Networking.Utils.MessageType.* ;
/ * *
* TCP server to act as the mock AC35 streaming interface
* TCP client which receives packets / messages from a race data source ( e . g . , mock source , official source ) , and exposes them to any observers .
* /
public class VisualiserInput implements Runnable
{
//Time since last heartbeat.
private long lastHeartbeatTime ;
public class VisualiserInput implements Runnable {
// socket port 4945 as 4940 is ac35 live port and 4941 is ac35 test port
private Socket connectionSocket ;
///A queue that contains messages that have been received, and need to be handled.
private ArrayBlockingQueue < AC35Data > messagesReceivedQueue = new ArrayBlockingQueue < > ( 1000000 ) ; //We have room for 1,000,000. Is this much capacity actually needed?
long heartbeatSeqNum ;
///Timestamp of the last heartbeat.
private long lastHeartbeatTime = - 1 ;
///Sequence number of the last heartbeat.
private long lastHeartbeatSequenceNum = - 1 ;
///How long we should wait for a heartbeat before assuming the connection is dead. Measured in milliseconds.
private long heartBeatPeriod = 10 * 1000 ;
private StreamedCourse course = null ;
///The socket that we have connected to.
private Socket connectionSocket ;
private Map < Integer , BoatLocationMessage > boatLocation ;
///Object to store parsed course data. //TODO comment?
private StreamedCourse course ;
///The last RaceStatus message received.
private RaceStatus raceStatus ;
private Map < Integer , BoatStatusMessage > boatStatus ;
///A map of the last BoatStatus message received, for each boat.
private Map < Integer , BoatStatus > boatStatusMap = new HashMap < > ( ) ; ;
///A map of the last BoatLocation message received, for each boat.
private Map < Integer , BoatLocation > boatLocationMap = new HashMap < > ( ) ;
///The last AverageWind message received.
private AverageWind averageWind ;
///The last CourseWinds message received.
private CourseWinds courseWinds ;
private Map < Integer , MarkRounding > markRounding ;
///A map of the last MarkRounding message received, for each boat.
private Map < Integer , MarkRounding > markRoundingMap = new HashMap < > ( ) ;
///The last RaceStartStatus message received.
private RaceStartStatus raceStartStatus ;
private BufferedInputStream inStream ;
///InputStream (from the socket).
private DataInputStream inStream ;
///TODO comment?
private boolean receiverLoop = true ;
public VisualiserInput ( Socket socket , StreamedCourse course ) throws IOException {
/ * *
* Ctor .
* @param socket Socket from which we will receive race data .
* @param course TODO comment ?
* @throws IOException If there is something wrong with the socket ' s input stream .
* /
public VisualiserInput ( Socket socket , StreamedCourse course ) throws IOException {
this . connectionSocket = socket ;
// this.connectionSocket = new Socket("livedata.americascup.com",4941);
this . inStream = new BufferedInputStream ( connectionSocket . getInputStream ( ) ) ;
//We wrap a DataInputStream around the socket's InputStream because it has the stream.readFully(buffer) function, which is a blocking read until the buffer has been filled.
this . inStream = new Data InputStream( connectionSocket . getInputStream ( ) ) ;
this . course = course ;
this . boatLocation = new HashMap < > ( ) ;
this . boatStatus = new HashMap < > ( ) ;
this . markRounding = new HashMap < > ( ) ;
//start Time
this . lastHeartbeatTime = System . currentTimeMillis ( ) ;
}
/ * *
* Provides StreamedCourse container for fixed course data
* @return c ourse for current VisualiserInput instance
* Provides StreamedCourse container for fixed course data .
* @return C ourse for current VisualiserInput instance .
* @see seng302 . Mock . StreamedCourse
* /
public StreamedCourse getCourse ( ) {
@ -80,228 +92,342 @@ public class VisualiserInput implements Runnable
/ * *
* Returns the last boat location message associated with the given boat source ID .
* @param sourceID u nique global identifier for boat
* @return most recent location message
* @param sourceID U nique global identifier for the boat.
* @return The most recent location message .
* /
public BoatLocation Message getBoatLocationMessage ( int sourceID ) {
return boatLocation . get ( sourceID ) ;
public BoatLocation getBoatLocationMessage ( int sourceID ) {
return boatLocation Map . get ( sourceID ) ;
}
/ * *
* c alculates the time since last heartbeat
* @return t ime since last heartbeat
* C alculates the time since last heartbeat , in milliseconds .
* @return T ime since last heartbeat , in milliseconds . .
* /
private double timeSinceHeartbeat ( ) {
long now = System . currentTimeMillis ( ) ;
return ( now - lastHeartbeatTime ) / 1000.0 ;
return ( now - lastHeartbeatTime ) ;
}
/ * *
* Returns the boat locations
* @return locations of where the boats are
* Returns the boat locations map . Maps between Integer ( Boat ID ) - > BoatLocation .
* @return Map of boat locations .
* /
public Map < Integer , BoatLocation Message > getBoatLocation ( ) {
return boatLocation ;
public Map < Integer , BoatLocation > getBoatLocation Map ( ) {
return boatLocation Map ;
}
/ * *
* Gets the status of the race
* @return t he status of the race
* Gets the status of the race .
* @return T he status of the race .
* /
public RaceStatus getRaceStatus ( ) {
return raceStatus ;
}
/ * *
* Hashmap of the boat status '
* @return Hash map of boat status
* Returns the boat statuses map . Maps between Integer ( Boat ID ) - > BoatStatus .
* @return Map of boat statuses .
* /
public Map < Integer , BoatStatus Message > getBoatStatus ( ) {
return boatStatus ;
public Map < Integer , BoatStatus > getBoatStatus Map ( ) {
return boatStatus Map ;
}
/ * *
* Returns the average wind of the race
* @return a verage wind in the race
* Returns the average wind of the race .
* @return A verage wind in the race .
* /
public AverageWind getAverageWind ( ) {
return averageWind ;
}
/ * *
* Returns winds in the course
* @return w inds that are in the course
* Returns winds in the course .
* @return W inds that are in the course .
* /
public CourseWinds getCourseWinds ( ) {
return courseWinds ;
}
/ * *
* Returns get Mark Rounding Boat Source ID , MarkRound
* @return the mark rounding
* Returns the mark roundings map . Maps between Integer ( Boat ID ) - > MarkRounding .
* @return Map of mark roundings .
* /
public Map < Integer , MarkRounding > getMarkRounding ( ) {
return markRounding ;
public Map < Integer , MarkRounding > getMarkRounding Map ( ) {
return markRounding Map ;
}
/ * *
* Takes an inputStream and reads the first 15 bytes ( the header ) and gets the message length
* for the whole message then reads that and returns the byte array
* @return encoded binary messsage bytes
* @throws IOException made by the inputstream reading
* Reads and returns the next message as an array of bytes from the socket . Use getNextMessage ( ) to get the actual message object instead .
* @return Encoded binary message bytes .
* @throws IOException Thrown when an error occurs while reading from the socket .
* /
private byte [ ] getBytes ( ) throws IOException {
inStream . mark ( 0 ) ;
if ( inStream . available ( ) < 15 ) return null ; //if there is not enough bytes foer the headerr
byte [ ] headerBytes = new byte [ 15 ] ;
inStream . read ( headerBytes ) ;
byte [ ] messageLenBytes = Arrays . copyOfRange ( headerBytes , 13 , 15 ) ;
short messageLen = bytesToShort ( messageLenBytes ) ;
byte [ ] messageBytesWithCRC = new byte [ messageLen + 4 ] ;
if ( inStream . available ( ) < messageLen + 4 ) { //if the message is not long enough
inStream . reset ( ) ; //reset pointer.
return null ;
}
inStream . read ( messageBytesWithCRC , 0 , messageLen + 4 ) ;
ByteBuffer binaryMessageBytes = ByteBuffer . allocate ( headerBytes . length + messageBytesWithCRC . length ) ;
binaryMessageBytes . put ( headerBytes ) ;
binaryMessageBytes . put ( messageBytesWithCRC ) ;
return binaryMessageBytes . array ( ) ;
private byte [ ] getNextMessageBytes ( ) throws IOException {
short CRCLength = 4 ;
short headerLength = 15 ;
//Read the header of the next message.
byte [ ] headerBytes = new byte [ headerLength ] ;
inStream . readFully ( headerBytes ) ;
//Read the message body length.
byte [ ] messageBodyLengthBytes = Arrays . copyOfRange ( headerBytes , headerLength - 2 , headerLength ) ;
short messageBodyLength = bytesToShort ( messageBodyLengthBytes ) ;
//Read the message body.
byte [ ] messageBodyBytes = new byte [ messageBodyLength ] ;
inStream . readFully ( messageBodyBytes ) ;
//Read the message CRC.
byte [ ] messageCRCBytes = new byte [ CRCLength ] ;
inStream . readFully ( messageCRCBytes ) ;
//Put the head + body + crc into one large array.
ByteBuffer messageBytes = ByteBuffer . allocate ( headerBytes . length + messageBodyBytes . length + messageCRCBytes . length ) ;
messageBytes . put ( headerBytes ) ;
messageBytes . put ( messageBodyBytes ) ;
messageBytes . put ( messageCRCBytes ) ;
return messageBytes . array ( ) ;
}
/ * *
* Reads and returns the next message object from the socket .
* @return The message object . Use instanceof for concrete type .
* @throws IOException Thrown when an error occurs while reading from the socket .
* @throws InvalidMessageException Thrown when the message is invalid in some way .
* /
private AC35Data getNextMessage ( ) throws IOException , InvalidMessageException
{
//Get the next message from the socket as a block of bytes.
byte [ ] messageBytes = this . getNextMessageBytes ( ) ;
//Decode the binary message into an appropriate message object.
BinaryMessageDecoder decoder = new BinaryMessageDecoder ( messageBytes ) ;
AC35Data message = decoder . decode ( ) ;
return message ;
}
/ * *
* Main loop which reads messages from the socket , and exposes them .
* /
public void run ( ) {
this . receiverLoop = true ;
try {
//receiver loop that gets the input
while ( receiverLoop ) {
//if no heartbeat has been received in more than 6 seconds
//the connection will need to be restarted
if ( timeSinceHeartbeat ( ) > 6 ) {
System . out . println ( "Connection has stopped, trying to reconnect" ) ;
receiverLoop = false ;
//receiver loop that gets the input
while ( receiverLoop ) {
//If no heartbeat has been received in more the heartbeat period
//then the connection will need to be restarted.
System . out . println ( "time since last heartbeat: " + timeSinceHeartbeat ( ) ) ; //TEMP REMOVE
if ( timeSinceHeartbeat ( ) > this . heartBeatPeriod ) {
System . out . println ( "Connection has stopped, trying to reconnect." ) ;
//Attempt to reconnect the socket.
try { //This attempt doesn't really work. Under what circumstances would
this . connectionSocket = new Socket ( this . connectionSocket . getInetAddress ( ) , this . connectionSocket . getPort ( ) ) ;
//this.connectionSocket.connect(this.connectionSocket.getRemoteSocketAddress());
//Reset the heartbeat timer.
this . lastHeartbeatTime = System . currentTimeMillis ( ) ;
}
catch ( IOException e ) {
System . err . println ( "Unable to reconnect." ) ;
//Wait 500ms. Ugly hack, should refactor.
long waitPeriod = 500 ;
long waitTimeStart = System . currentTimeMillis ( ) + waitPeriod ;
while ( System . currentTimeMillis ( ) < waitTimeStart ) {
//Nothing. Busyloop.
}
//Swallow the exception.
continue ;
}
}
//Reads the next message.
AC35Data message = null ;
try {
message = this . getNextMessage ( ) ;
}
catch ( InvalidMessageException | IOException e ) {
//Prints exception to stderr, and iterate loop (that is, read the next message).
System . err . println ( "Unable to read message: " + e . getMessage ( ) ) ;
//Continue to the next loop iteration/message.
continue ;
}
//Add it to message queue.
this . messagesReceivedQueue . add ( message ) ;
//Checks which message is being received and does what is needed for that message.
//Heartbeat.
if ( message instanceof Heartbeat ) {
Heartbeat heartbeat = ( Heartbeat ) message ;
//Check that the heartbeat number is greater than the previous value, and then set the last heartbeat time.
if ( heartbeat . getSequenceNumber ( ) > this . lastHeartbeatSequenceNum ) {
lastHeartbeatTime = System . currentTimeMillis ( ) ;
lastHeartbeatSequenceNum = heartbeat . getSequenceNumber ( ) ;
//System.out.println("HeartBeat Message! " + lastHeartbeatSequenceNum);
}
//converts the input into a byte array that can be read by the decoder
byte [ ] binaryMessage = getBytes ( ) ;
//if there is no bytes read.
if ( binaryMessage = = null ) {
continue ;
}
//RaceStatus.
else if ( message instanceof RaceStatus ) {
RaceStatus raceStatus = ( RaceStatus ) message ;
//System.out.println("Race Status Message");
this . raceStatus = raceStatus ;
for ( BoatStatus boatStatus : this . raceStatus . getBoatStatuses ( ) ) {
this . boatStatusMap . put ( boatStatus . getSourceID ( ) , boatStatus ) ;
}
}
//DisplayTextMessage.
/ * else if ( message instanceof DisplayTextMessage ) {
//System.out.println("Display Text Message");
//No decoder for this.
} * /
//XMLMessage.
else if ( message instanceof XMLMessage ) {
XMLMessage xmlMessage = ( XMLMessage ) message ;
//System.out.println("XML Message!");
if ( xmlMessage . getXmlMsgSubType ( ) = = xmlMessage . XMLTypeRegatta ) {
//System.out.println("Setting Regatta");
try {
course . setRegattaXMLReader ( new RegattaXMLReader ( xmlMessage . getXmlMessage ( ) ) ) ;
}
//TODO REFACTOR should put all of these exceptions behind a RegattaXMLReaderException.
catch ( IOException | SAXException | ParserConfigurationException e ) {
System . err . println ( "Error creating RegattaXMLReader: " + e . getMessage ( ) ) ;
//Continue to the next loop iteration/message.
continue ;
}
} else if ( xmlMessage . getXmlMsgSubType ( ) = = xmlMessage . XMLTypeRace ) {
//System.out.println("Setting Course");
try {
course . setStreamedCourseXMLReader ( new StreamedCourseXMLReader ( xmlMessage . getXmlMessage ( ) ) ) ;
}
//TODO REFACTOR should put all of these exceptions behind a StreamedCourseXMLReaderException.
catch ( IOException | SAXException | ParserConfigurationException | ParseException | StreamedCourseXMLException e ) {
System . err . println ( "Error creating StreamedCourseXMLReader: " + e . getMessage ( ) ) ;
//Continue to the next loop iteration/message.
continue ;
}
} else if ( xmlMessage . getXmlMsgSubType ( ) = = xmlMessage . XMLTypeBoat ) {
//System.out.println("Setting Boats");
try
{
course . setBoatXMLReader ( new BoatXMLReader ( xmlMessage . getXmlMessage ( ) ) ) ;
}
//TODO REFACTOR should put all of these exceptions behind a BoatXMLReaderException.
catch ( IOException | SAXException | ParserConfigurationException e ) {
System . err . println ( "Error creating BoatXMLReader: " + e . getMessage ( ) ) ;
//Continue to the next loop iteration/message.
continue ;
}
//decode the binary message into readable date
BinaryMessageDecoder testDecoder = new BinaryMessageDecoder ( binaryMessage ) ;
AC35Data data = testDecoder . decode ( ) ;
if ( data = = null ) {
continue ;
}
//checks which message is being received and does what is needed for that message
MessageType mType = data . getType ( ) ;
switch ( mType ) {
case HEARTBEAT :
lastHeartbeatTime = System . currentTimeMillis ( ) ;
//note: if the program runs for over 340 years, this will crash.
heartbeatSeqNum = ByteConverter . bytesToLong ( testDecoder . getMessage ( ) ) ;
// System.out.println("HeartBeat Message! " + heartbeatSeqNum);
break ;
case RACESTATUS :
// System.out.println("Race Status Message");
raceStatus = ( ( RaceStatus ) data ) ;
for ( BoatStatusMessage msg : raceStatus . getBoatStatusMessages ( ) ) {
boatStatus . put ( msg . getSourceID ( ) , msg ) ;
}
break ;
case DISPLAYTEXTMESSAGE :
// System.out.println("Display Text Message");
//no decoder for this.
break ;
case XMLMESSAGE :
System . out . println ( "XML Message!" ) ;
XMLMessage xml = ( XMLMessage ) data ;
try {
if ( xml . getXmlMsgSubType ( ) = = xml . XMLTypeRegatta ) {
System . out . println ( "Setting Regatta" ) ;
course . setRegattaXMLReader ( new RegattaXMLReader ( xml . getXmlMessage ( ) ) ) ;
} else if ( xml . getXmlMsgSubType ( ) = = xml . XMLTypeRace ) {
System . out . println ( "Setting Course" ) ;
course . setStreamedCourseXMLReader ( new StreamedCourseXMLReader ( xml . getXmlMessage ( ) ) ) ;
} else if ( xml . getXmlMsgSubType ( ) = = xml . XMLTypeBoat ) {
System . out . println ( "Setting Boats" ) ;
course . setBoatXMLReader ( new BoatXMLReader ( xml . getXmlMessage ( ) ) ) ;
}
} catch ( SAXException e ) {
e . printStackTrace ( ) ;
} catch ( ParserConfigurationException e ) {
e . printStackTrace ( ) ;
} catch ( ParseException e ) {
e . printStackTrace ( ) ;
} catch ( StreamedCourseXMLException e ) {
e . printStackTrace ( ) ;
}
break ;
case RACESTARTSTATUS :
//System.out.println("Race Start Status Message");
RaceStartStatus rSS = ( RaceStartStatus ) data ;
raceStartStatus = rSS ;
break ;
case YACHTEVENTCODE :
// System.out.println("Yacht Action Code!");
//no decoder
break ;
case YACHTACTIONCODE :
// System.out.println("Yacht Action Code!");
//no decoder
break ;
case CHATTERTEXT :
// System.out.println("Chatter Text Message!");
//no decoder
break ;
case BOATLOCATION :
// System.out.println("Boat Location!");
BoatLocationMessage msg = ( BoatLocationMessage ) data ;
if ( boatLocation . containsKey ( msg . getSourceID ( ) ) ) {
if ( msg . getTime ( ) > boatLocation . get ( msg . getSourceID ( ) ) . getTime ( ) ) {
boatLocation . put ( msg . getSourceID ( ) , msg ) ;
}
} else {
boatLocation . put ( msg . getSourceID ( ) , msg ) ;
}
// System.out.println("Boat Location Message!");
break ;
case MARKROUNDING :
// System.out.println("Mark Rounding Message!");
MarkRounding mkrounding = ( MarkRounding ) data ;
markRounding . put ( mkrounding . getSourceID ( ) , mkrounding ) ;
break ;
case COURSEWIND :
// System.out.println("Course Wind Message!");
courseWinds = ( CourseWinds ) data ;
break ;
case AVGWIND :
// System.out.println("Average Wind Message!");
AverageWind avgWind = ( AverageWind ) data ;
averageWind = avgWind ;
break ;
default :
System . out . println ( "Broken Message!" ) ;
break ;
}
//RaceStartStatus.
else if ( message instanceof RaceStartStatus ) {
RaceStartStatus raceStartStatus = ( RaceStartStatus ) message ;
//System.out.println("Race Start Status Message");
this . raceStartStatus = raceStartStatus ;
}
//YachtEventCode.
/ * else if ( message instanceof YachtEventCode ) {
YachtEventCode yachtEventCode = ( YachtEventCode ) message ;
//System.out.println("Yacht Event Code!");
//No decoder for this.
} * /
//YachtActionCode.
/ * else if ( message instanceof YachtActionCode ) {
YachtActionCode yachtActionCode = ( YachtActionCode ) message ;
//System.out.println("Yacht Action Code!");
//No decoder for this.
} * /
//ChatterText.
/ * else if ( message instanceof ChatterText ) {
ChatterText chatterText = ( ChatterText ) message ;
//System.out.println("Chatter Text Message!");
//No decoder for this.
} * /
//BoatLocation.
else if ( message instanceof BoatLocation ) {
BoatLocation boatLocation = ( BoatLocation ) message ;
//System.out.println("Boat Location!");
if ( this . boatLocationMap . containsKey ( boatLocation . getSourceID ( ) ) ) {
//If our boatlocation map already contains a boat location message for this boat, check that the new message is actually for a later timestamp (i.e., newer).
if ( boatLocation . getTime ( ) > this . boatLocationMap . get ( boatLocation . getSourceID ( ) ) . getTime ( ) ) {
//If it is, replace the old message.
this . boatLocationMap . put ( boatLocation . getSourceID ( ) , boatLocation ) ;
}
} else {
//If the map _doesn't_ already contain a message for this boat, insert the message.
this . boatLocationMap . put ( boatLocation . getSourceID ( ) , boatLocation ) ;
}
}
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
}
//MarkRounding.
else if ( message instanceof MarkRounding ) {
MarkRounding markRounding = ( MarkRounding ) message ;
//System.out.println("Mark Rounding Message!");
if ( this . markRoundingMap . containsKey ( markRounding . getSourceID ( ) ) ) {
//If our markRoundingMap already contains a mark rounding message for this boat, check that the new message is actually for a later timestamp (i.e., newer).
if ( markRounding . getTime ( ) > this . markRoundingMap . get ( markRounding . getSourceID ( ) ) . getTime ( ) ) {
//If it is, replace the old message.
this . markRoundingMap . put ( markRounding . getSourceID ( ) , markRounding ) ;
}
} else {
//If the map _doesn't_ already contain a message for this boat, insert the message.
this . markRoundingMap . put ( markRounding . getSourceID ( ) , markRounding ) ;
}
public boolean isReceiverLoop ( ) {
return receiverLoop ;
}
}
//CourseWinds.
else if ( message instanceof CourseWinds ) {
CourseWinds courseWinds = ( CourseWinds ) message ;
public static void main ( String argv [ ] ) throws Exception
{
Socket socket = new Socket ( "livedata.americascup.com" , 4941 ) ;
// Socket socket = new Socket(InetAddress.getLocalHost(), 4942);
VisualiserInput receiver = new VisualiserInput ( socket , new StreamedCourse ( ) ) ;
receiver . run ( ) ;
//System.out.println("Course Wind Message!");
this . courseWinds = courseWinds ;
}
//AverageWind.
else if ( message instanceof AverageWind ) {
AverageWind averageWind = ( AverageWind ) message ;
//System.out.println("Average Wind Message!");
this . averageWind = averageWind ;
}
//Unrecognised message.
else {
System . out . println ( "Broken Message!" ) ;
}
}
}
}