package seng302; import org.opengis.style.Mark; import org.xml.sax.SAXException; import seng302.Mock.*; import seng302.Networking.BinaryMessageDecoder; import seng302.Networking.MessageDecoders.*; import seng302.Networking.Utils.*; import javax.xml.parsers.ParserConfigurationException; import java.io.*; import java.net.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.text.ParseException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import static seng302.Networking.Utils.ByteConverter.bytesToInt; import static seng302.Networking.Utils.ByteConverter.bytesToShort; import static seng302.Networking.Utils.MessageType.*; /** * TCP server to act as the mock AC35 streaming interface */ public class VisualiserInput implements Runnable { //time since last heartbeat private long lastHeartbeatTime; //socket port 4945 as 4940 is ac35 live port and 4941 is ac35 test port private Socket connectionSocket; long heartbeatSeqNum; private StreamedCourse course = null; private Map boatLocation; private RaceStatus raceStatus; private Map boatStatus; private AverageWind averageWind; private CourseWinds courseWinds; private Map markRounding; private RaceStartStatus raceStartStatus; public VisualiserInput(Socket socket, StreamedCourse course) throws IOException{ this.connectionSocket = socket; // this.connectionSocket = new Socket("livedata.americascup.com",4941); this.course = course; this.boatLocation = new HashMap<>(); this.boatStatus = new HashMap<>(); this.markRounding = new HashMap<>(); //start Time this.lastHeartbeatTime = System.currentTimeMillis(); } /** * Provides StreamedCourse container for fixed course data * @return course for current VisualiserInput instance * @see seng302.Mock.StreamedCourse */ public StreamedCourse getCourse() { return course; } /** * Returns the last boat location message associated with the given boat source ID. * @param sourceID unique global identifier for boat * @return most recent location message */ public BoatLocationMessage getBoatLocationMessage(int sourceID) { return boatLocation.get(sourceID); } /** * calculates the time since last heartbeat * @return time since last heartbeat */ private double timeSinceHeartbeat() { long now = System.currentTimeMillis(); return (now - lastHeartbeatTime) / 1000.0; } /** * Returns the boat locations * @return locations of where the boats are */ public Map getBoatLocation() { return boatLocation; } /** * Gets the status of the race * @return the status of the race */ public RaceStatus getRaceStatus() { return raceStatus; } /** * Hashmap of the boat status' * @return Hash map of boat status */ public Map getBoatStatus() { return boatStatus; } /** * Returns the average wind of the race * @return average wind in the race */ public AverageWind getAverageWind() { return averageWind; } /** * Returns winds in the course * @return winds that are in the course */ public CourseWinds getCourseWinds() { return courseWinds; } /** * Returns get Mark Rounding Boat Source ID, MarkRound * @return */ public Map getMarkRounding() { return markRounding; } /** * Takes an inputStream and reads the first 15 bytes (the header) and gets the message length * for the whole message then reads that and returns the byte array * @param inStream inputStream from socket * @return encoded binary messsage bytes * @throws IOException */ private static byte[] getBytes(InputStream inStream) throws IOException { byte[] headerBytes = new byte[15]; inStream.read(headerBytes); byte[] messageLenBytes = Arrays.copyOfRange(headerBytes, 13, 15); short messageLen = bytesToShort(messageLenBytes); byte[] messageBytesWithCRC = new byte[messageLen+4]; while (inStream.available() < messageLen + 4){ //WE NEED THIS!!!! } inStream.read(messageBytesWithCRC, 0, messageLen + 4); ByteBuffer binaryMessageBytes = ByteBuffer.allocate(headerBytes.length+messageBytesWithCRC.length); binaryMessageBytes.put(headerBytes); binaryMessageBytes.put(messageBytesWithCRC); return binaryMessageBytes.array(); } public void run(){ try{ //receiver loop that gets the input boolean receiverLoop = true; while(receiverLoop) { //gets the input from the socket InputStream inFromClient = connectionSocket.getInputStream(); //converts the input into a byte array that can be read by the decoder byte[] binaryMessage = getBytes(inFromClient); //decode the binary message into readable date BinaryMessageDecoder testDecoder = new BinaryMessageDecoder(binaryMessage); AC35Data data = testDecoder.decode(); if (data == null){ continue; } //checks which message is being received and does what is needed for that message MessageType mType = data.getType(); switch (mType) { case HEARTBEAT: lastHeartbeatTime = System.currentTimeMillis(); //note: if the program runs for over 340 years, this will crash. heartbeatSeqNum = ByteConverter.bytesToLong(testDecoder.getMessage()); // System.out.println("HeartBeat Message! " + heartbeatSeqNum); break; case RACESTATUS: System.out.println("Race Status Message"); raceStatus = ((RaceStatus) data); for (BoatStatusMessage msg: raceStatus.getBoatStatusMessages()){ boatStatus.put(msg.getSourceID(), msg); } break; case DISPLAYTEXTMESSAGE: // System.out.println("Display Text Message"); //no decoder for this. break; case XMLMESSAGE: System.out.println("XML Message!"); XMLMessage xml = (XMLMessage) data; try { if (xml.getXmlMsgSubType() == xml.XMLTypeRegatta){ System.out.println("Setting Regatta"); course.setRegattaXMLReader(new RegattaXMLReader(xml.getXmlMessage())); } else if (xml.getXmlMsgSubType() == xml.XMLTypeRace){ System.out.println("Setting Course"); course.setStreamedCourseXMLReader(new StreamedCourseXMLReader(xml.getXmlMessage())); } else if (xml.getXmlMsgSubType() == xml.XMLTypeBoat){ System.out.println("Setting Boats"); course.setBoatXMLReader(new BoatXMLReader(xml.getXmlMessage())); } } catch (SAXException e) { e.printStackTrace(); } catch (ParserConfigurationException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } catch (StreamedCourseXMLException e) { e.printStackTrace(); } break; case RACESTARTSTATUS: //System.out.println("Race Start Status Message"); RaceStartStatus rSS = (RaceStartStatus) data; raceStartStatus = rSS; break; case YACHTEVENTCODE: // System.out.println("Yacht Action Code!"); //no decoder break; case YACHTACTIONCODE: // System.out.println("Yacht Action Code!"); //no decoder break; case CHATTERTEXT: // System.out.println("Chatter Text Message!"); //no decoder break; case BOATLOCATION: System.out.println("Boat Location!"); BoatLocationMessage msg = (BoatLocationMessage) data; if (boatLocation.containsKey(msg.getSourceID())){ if (msg.getTime() > boatLocation.get(msg.getSourceID()).getTime()){ boatLocation.put(msg.getSourceID(), msg); } }else{ boatLocation.put(msg.getSourceID(), msg); } // System.out.println("Boat Location Message!"); break; case MARKROUNDING: // System.out.println("Mark Rounding Message!"); MarkRounding mkrounding = (MarkRounding) data; markRounding.put(mkrounding.getSourceID(), mkrounding); break; case COURSEWIND: // System.out.println("Course Wind Message!"); courseWinds = (CourseWinds) data; break; case AVGWIND: // System.out.println("Average Wind Message!"); AverageWind avgWind = (AverageWind) data; averageWind = avgWind; break; default: System.out.println("Broken Message!"); break; } //if no heartbeat has been received in more than 6 seconds //the connection will need to be restarted if (timeSinceHeartbeat() > 6){ System.out.println("Connection has stopped, trying to reconnect"); receiverLoop = false; } } }catch(IOException e){ e.printStackTrace(); } } public static void main(String argv[]) throws Exception { Socket socket = new Socket(InetAddress.getLocalHost(), 4943); VisualiserInput receiver = new VisualiserInput(socket, new StreamedCourse()); receiver.run(); } }