Added names to threads created.

MockOutput had a nested while loop, but it wasn't needed. Also tidied some of the error handling in MockOutput.

#story[1095]
main
fjc40 9 years ago
parent 134586f407
commit 634d78ab70

@ -51,10 +51,10 @@ public class ConnectionAcceptor implements Runnable {
*/
public ConnectionAcceptor(LatestMessages latestMessages) throws IOException {
this.latestMessages =latestMessages;
this.latestMessages = latestMessages;
this.serverSocket = new ServerSocket(serverPort);
CheckClientConnection checkClientConnection = new CheckClientConnection(mockOutputList);
new Thread(checkClientConnection).start();
new Thread(checkClientConnection, "ConnectionAcceptor()->CheckClientConnection thread").start();
}
public String getAddress() throws UnknownHostException {
@ -70,19 +70,30 @@ public class ConnectionAcceptor implements Runnable {
*/
@Override
public void run() {
while(true){//should be connections not filled up
while(mockOutputList.remainingCapacity() > 0) {
try {
System.out.println("Waiting for a connection...");//TEMP DEBUG REMOVE
Socket mockSocket = serverSocket.accept();
//TODO at this point we need to assign the connection a boat source ID, if they requested to participate.
DataOutputStream outToVisualiser = new DataOutputStream(mockSocket.getOutputStream());
MockOutput mockOutput = new MockOutput(latestMessages, outToVisualiser);
ControllerServer controllerServer = new ControllerServer(mockSocket);
new Thread(mockOutput).start();
new Thread(controllerServer).start();
ControllerServer controllerServer = new ControllerServer(mockSocket); //TODO probably pass assigned boat source ID into ControllerServer.
new Thread(mockOutput, "ConnectionAcceptor.run()->MockOutput thread" + mockOutput).start();
new Thread(controllerServer, "ConnectionAcceptor.run()->ControllerServer thread" + controllerServer).start();
mockOutputList.add(mockOutput);
System.out.println(String.format("%d number of Visualisers Connected.", mockOutputList.size()));
System.out.println(String.format("%d number of Visualisers Connected.", mockOutputList.size()));//TEMP
} catch (IOException e) {
e.printStackTrace();
e.printStackTrace();//TODO handle this properly
}
}
@ -111,7 +122,7 @@ public class ConnectionAcceptor implements Runnable {
double timeSinceLastHeartBeat = System.currentTimeMillis();
while(true) {
//System.out.println(mocks.size());//used to see current amount of visualisers connected.
ArrayBlockingQueue<MockOutput> m = new ArrayBlockingQueue(16, true, mocks);
ArrayBlockingQueue<MockOutput> m = new ArrayBlockingQueue<>(16, true, mocks);
for (MockOutput mo : m) {
try {
mo.sendHeartBeat();

@ -35,7 +35,7 @@ public class Event {
private Polars boatPolars;
private ConnectionAcceptor mockOutput;
private ConnectionAcceptor connectionAcceptor;
private LatestMessages latestMessages;
/**
@ -51,7 +51,7 @@ public class Event {
this.boatPolars = PolarParser.parse("mock/polars/acc_polars.csv");
this.latestMessages = new LatestMessages();
this.mockOutput = new ConnectionAcceptor(latestMessages);
this.connectionAcceptor = new ConnectionAcceptor(latestMessages);
}
catch (IOException e) {
e.printStackTrace();
@ -67,11 +67,11 @@ public class Event {
}
public String getAddress() throws UnknownHostException {
return mockOutput.getAddress();
return connectionAcceptor.getAddress();
}
public int getPort() {
return mockOutput.getServerPort();
return connectionAcceptor.getServerPort();
}
/**
@ -82,7 +82,7 @@ public class Event {
* @throws InvalidRegattaDataException Thrown if the regatta xml file cannot be parsed.
*/
public void start() throws InvalidRaceDataException, XMLReaderException, InvalidBoatDataException, InvalidRegattaDataException {
new Thread(mockOutput).start();
new Thread(connectionAcceptor, "Event.Start()->ConnectionAcceptor thread").start();
sendXMLs();
@ -94,7 +94,7 @@ public class Event {
//Create and start race.
RaceLogic newRace = new RaceLogic(new MockRace(boatDataSource, raceDataSource, regattaDataSource, this.latestMessages, this.boatPolars, Constants.RaceTimeScale), this.latestMessages);
new Thread(newRace).start();
new Thread(newRace, "Event.Start()->RaceLogic thread").start();
}
/**
@ -102,11 +102,11 @@ public class Event {
*/
private void sendXMLs() {
mockOutput.setRegattaXml(regattaXML);
connectionAcceptor.setRegattaXml(regattaXML);
mockOutput.setRaceXml(raceXML);
connectionAcceptor.setRaceXml(raceXML);
mockOutput.setBoatsXml(boatXML);
connectionAcceptor.setBoatsXml(boatXML);
}
/**

@ -54,8 +54,6 @@ public class MockOutput implements Runnable
private int heartbeatSequenceNum = 1;
private boolean stop = false; //whether or not hte thread keeps running
/**
* Ctor.
* @param latestMessages Latests Messages that the Mock is to send out
@ -231,119 +229,124 @@ public class MockOutput implements Runnable
*/
public void run() {
try {
while (!stop){
//Wait until all of the xml files have been set.
if (!this.latestMessages.hasAllXMLMessages()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
//Wait until all of the xml files have been set.
while (!this.latestMessages.hasAllXMLMessages()) {
long previousFrameTime = System.currentTimeMillis();
boolean sentXMLs = false;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
//If we get interrupted, exit the function.
Logger.getGlobal().log(Level.WARNING, "MockOutput.run().sleep(waitForXMLs) was interrupted on thread: " + Thread.currentThread(), e);
//Re-set the interrupt flag.
Thread.currentThread().interrupt();
return;
}
}
while(true) {
try {
long currentFrameTime = System.currentTimeMillis();
long previousFrameTime = System.currentTimeMillis();
boolean sentXMLs = false;
//This is the time elapsed, in milliseconds, since the last server "frame".
long framePeriod = currentFrameTime - previousFrameTime;
try {
while (!Thread.interrupted()) {
//We only attempt to send packets every X milliseconds.
long minimumFramePeriod = 16;
if (framePeriod >= minimumFramePeriod) {
try {
//Send XML messages.
if (!sentXMLs) {
//Serialise them.
long currentFrameTime = System.currentTimeMillis();
try {
byte[] raceXMLBlob = parseXMLMessage(latestMessages.getRaceXMLMessage());
byte[] regattaXMLBlob = parseXMLMessage(latestMessages.getRegattaXMLMessage());
byte[] boatsXMLBlob = parseXMLMessage(latestMessages.getBoatXMLMessage());
//This is the time elapsed, in milliseconds, since the last server "frame".
long framePeriod = currentFrameTime - previousFrameTime;
//Send them.
outToVisualiser.write(raceXMLBlob);
outToVisualiser.write(regattaXMLBlob);
outToVisualiser.write(boatsXMLBlob);
sentXMLs = true;
//We only attempt to send packets every X milliseconds.
long minimumFramePeriod = 16;
if (framePeriod >= minimumFramePeriod) {
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode XMLMessage: " + latestMessages.getRaceXMLMessage(), e);
continue; //Go to next iteration.
}
//Send XML messages.
if (!sentXMLs) {
//Serialise them.
try {
byte[] raceXMLBlob = parseXMLMessage(latestMessages.getRaceXMLMessage());
byte[] regattaXMLBlob = parseXMLMessage(latestMessages.getRegattaXMLMessage());
byte[] boatsXMLBlob = parseXMLMessage(latestMessages.getBoatXMLMessage());
//Send them.
outToVisualiser.write(raceXMLBlob);
outToVisualiser.write(regattaXMLBlob);
outToVisualiser.write(boatsXMLBlob);
sentXMLs = true;
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode XMLMessage: " + latestMessages.getRaceXMLMessage(), e);
continue; //Go to next iteration.
}
//Sends the RaceStatus message.
if (this.latestMessages.getRaceStatus() != null) {
}
try {
byte[] raceStatusBlob = this.parseRaceStatus(this.latestMessages.getRaceStatus());
//Sends the RaceStatus message.
if (this.latestMessages.getRaceStatus() != null) {
this.outToVisualiser.write(raceStatusBlob);
try {
byte[] raceStatusBlob = this.parseRaceStatus(this.latestMessages.getRaceStatus());
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode RaceStatus: " + latestMessages.getRaceStatus(), e);
}
this.outToVisualiser.write(raceStatusBlob);
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode RaceStatus: " + latestMessages.getRaceStatus(), e);
}
}
//Send all of the BoatLocation messages.
for (int sourceID : this.latestMessages.getBoatLocationMap().keySet()) {
//Send all of the BoatLocation messages.
for (int sourceID : this.latestMessages.getBoatLocationMap().keySet()) {
//Get the message.
BoatLocation boatLocation = this.latestMessages.getBoatLocation(sourceID);
if (boatLocation != null) {
//Get the message.
BoatLocation boatLocation = this.latestMessages.getBoatLocation(sourceID);
if (boatLocation != null) {
try {
//Encode.
byte[] boatLocationBlob = this.parseBoatLocation(boatLocation);
try {
//Encode.
byte[] boatLocationBlob = this.parseBoatLocation(boatLocation);
//Write it.
this.outToVisualiser.write(boatLocationBlob);
//Write it.
this.outToVisualiser.write(boatLocationBlob);
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode BoatLocation: " + boatLocation, e);
}
} catch (InvalidMessageException e) {
Logger.getGlobal().log(Level.WARNING, "Could not encode BoatLocation: " + boatLocation, e);
}
}
}
}
previousFrameTime = currentFrameTime;
previousFrameTime = currentFrameTime;
} else {
//Wait until the frame period will be large enough.
long timeToWait = minimumFramePeriod - framePeriod;
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
//If we get interrupted, exit the function.
e.printStackTrace();
//Re-set the interrupt flag.
Thread.currentThread().interrupt();
return;
}
} else {
//Wait until the frame period will be large enough.
long timeToWait = minimumFramePeriod - framePeriod;
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
//If we get interrupted, exit the function.
Logger.getGlobal().log(Level.WARNING, "MockOutput.run().sleep(framePeriod) was interrupted on thread: " + Thread.currentThread(), e);
//Re-set the interrupt flag.
Thread.currentThread().interrupt();
return;
}
} catch (SocketException e) {
break;
}
} catch (SocketException e) {
break;
}
}
} catch (IOException e) {
@ -351,8 +354,5 @@ public class MockOutput implements Runnable
}
}
public void stop(){
stop = true;
}
}

@ -4,6 +4,7 @@ package network.MessageEncoders;
import network.Exceptions.InvalidMessageException;
import network.Exceptions.InvalidMessageTypeException;
import network.Messages.*;
import network.Messages.Enums.MessageType;
import static network.Utils.ByteConverter.*;

Loading…
Cancel
Save