Skip to content
Snippets Groups Projects
Commit cbc09e0d authored by Dmitry Säkki's avatar Dmitry Säkki
Browse files

final version

parent 7ffa854f
Branches
No related tags found
No related merge requests found
Pipeline #59600 passed
...@@ -46,7 +46,25 @@ public class MessageBroker extends Thread { ...@@ -46,7 +46,25 @@ public class MessageBroker extends Thread {
* 7. Return the processed message * 7. Return the processed message
*/ */
private Message process(Object procMessage) { private Message process(Object procMessage) {
return null; if(!(procMessage instanceof Message))return null;
Message message = (Message) procMessage;
gui_io.setReceivedMessage(message.getMessage());
if(prevMessages.containsKey(message.getId()))return null;
String newText = Refiner.refineText(message.getMessage());
int newColor = Refiner.refineColor(message.getColor());
message.setMessage(newText);
message.setColor(newColor);
gui_io.setSignal(newColor);
gui_io.setRefinedMessage(newText);
prevMessages.put(message.getId());
return message;
} }
/* /*
...@@ -61,6 +79,14 @@ public class MessageBroker extends Thread { ...@@ -61,6 +79,14 @@ public class MessageBroker extends Thread {
* *
*/ */
public void run() { public void run() {
while (true) {
try {
Message procMessage = process(procQueue.take());
if(procMessage != null){ send(procMessage);}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
...@@ -70,6 +96,7 @@ public class MessageBroker extends Thread { ...@@ -70,6 +96,7 @@ public class MessageBroker extends Thread {
* @param message The Message object to be sent * @param message The Message object to be sent
*/ */
public void send(Message message) { public void send(Message message) {
if(!prevMessages.containsKey(message.getId())) prevMessages.put(message.getId());
network.postMessage(message); network.postMessage(message);
} }
......
package fi.utu.tech.telephonegame.network;
import fi.utu.tech.telephonegame.Message;
import java.io.*;
import java.net.Socket;
import java.util.concurrent.TransferQueue;
public class ClientHandler extends Thread {
private Socket asiakasSocket;
private TransferQueue<Object> inQueue;
private ObjectOutputStream outputStream;
public ClientHandler(Socket asiakasSocket, TransferQueue<Object> inQueue){
this.asiakasSocket = asiakasSocket;
this.inQueue = inQueue;
try {
this.outputStream = new ObjectOutputStream(asiakasSocket.getOutputStream());
}
catch (IOException e){
System.err.println("Can not get an output stream.");
System.err.println(e.getMessage());
}
}
public void sendMessage(Serializable message){
try{
outputStream.writeObject(message);
outputStream.flush();
}
catch (IOException e){
System.err.println("Can not send an output message.");
System.err.println(e.getMessage());
}
}
@Override
public void run() {
System.out.println(this.toString());
try {
ObjectInputStream inputStream = new ObjectInputStream(asiakasSocket.getInputStream());
while (true) {
try {
Message inMessage = (Message) inputStream.readObject();
System.out.println("Socket "+asiakasSocket.toString()+" got a message "+inMessage.toString());
inQueue.add(inMessage);
} catch (IOException e) {
System.err.println("Can not get an input message.");
System.err.println(e.getMessage());
} catch (ClassNotFoundException e) {
System.err.println("Can not convert an input message to type Message.");
System.err.println(e.getMessage());
}
}
}
catch (IOException e){
System.err.println("Can not get an input stream.");
System.err.println(e.getMessage());
}
}
}
...@@ -2,7 +2,11 @@ package fi.utu.tech.telephonegame.network; ...@@ -2,7 +2,11 @@ package fi.utu.tech.telephonegame.network;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue; import java.util.concurrent.TransferQueue;
...@@ -14,6 +18,8 @@ public class NetworkService extends Thread implements Network { ...@@ -14,6 +18,8 @@ public class NetworkService extends Thread implements Network {
*/ */
private TransferQueue<Object> inQueue = new LinkedTransferQueue<Object>(); // For messages incoming from network private TransferQueue<Object> inQueue = new LinkedTransferQueue<Object>(); // For messages incoming from network
private TransferQueue<Serializable> outQueue = new LinkedTransferQueue<Serializable>(); // For messages outgoing to network private TransferQueue<Serializable> outQueue = new LinkedTransferQueue<Serializable>(); // For messages outgoing to network
private List<ClientHandler> clientList = new ArrayList<ClientHandler>();
private ServerSocket serverSocket = null;
/* /*
...@@ -33,8 +39,39 @@ public class NetworkService extends Thread implements Network { ...@@ -33,8 +39,39 @@ public class NetworkService extends Thread implements Network {
* *
*/ */
public void startListening (int serverPort) { public void startListening (int serverPort) {
try {
System.out.printf("I should start listening for peers at port %d%n", serverPort); System.out.printf("I should start listening for peers at port %d%n", serverPort);
// TODO serverSocket = new ServerSocket(serverPort);
System.out.println(serverSocket.toString());
Runnable startListeningFunc = ()-> {
while(true) {
try {
Socket asiakas = serverSocket.accept();
System.out.println(asiakas.toString());
ClientHandler ch = new ClientHandler(asiakas, inQueue);
clientList.add(ch);
System.out.println("Server " + serverSocket.toString() + " got new peer " + asiakas.toString());
ch.start();
} catch (IOException e) {
System.out.println("Can not accept peer connection.");
System.out.println(e.getMessage());
}
}
};
Thread listeningServerThread = new Thread(startListeningFunc);
listeningServerThread.start();
System.out.println("Listening is started");
}
catch(IOException e){
System.err.printf("I can not start listening for peers at port %d%n", serverPort);
System.err.printf("Exeption message: %d%n", e.getMessage());
}
} }
/** /**
...@@ -47,7 +84,18 @@ public class NetworkService extends Thread implements Network { ...@@ -47,7 +84,18 @@ public class NetworkService extends Thread implements Network {
*/ */
public void connect(String peerIP, int peerPort) throws IOException, UnknownHostException { public void connect(String peerIP, int peerPort) throws IOException, UnknownHostException {
System.out.printf("I should connect myself to %s, port %d%n", peerIP, peerPort); System.out.printf("I should connect myself to %s, port %d%n", peerIP, peerPort);
// TODO try {
Socket asiakas = new Socket(peerIP, peerPort);
ClientHandler ch = new ClientHandler(asiakas, inQueue);
clientList.add(ch);
ch.start();
}
catch (IOException e){
System.err.println("Can not connect to the server.");
System.err.println(e.getMessage());
}
} }
/** /**
...@@ -58,7 +106,10 @@ public class NetworkService extends Thread implements Network { ...@@ -58,7 +106,10 @@ public class NetworkService extends Thread implements Network {
*/ */
private void send(Serializable out) { private void send(Serializable out) {
// Send the object to all neighbouring nodes // Send the object to all neighbouring nodes
// TODO System.out.println("I have numbers of peers "+clientList.size());
for(ClientHandler client: clientList){
client.sendMessage(out);
}
} }
/* /*
...@@ -101,6 +152,7 @@ public class NetworkService extends Thread implements Network { ...@@ -101,6 +152,7 @@ public class NetworkService extends Thread implements Network {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
......
...@@ -44,6 +44,7 @@ public class Resolver extends Thread { ...@@ -44,6 +44,7 @@ public class Resolver extends Thread {
this.serverMode = true; this.serverMode = true;
try { try {
this.serverSocket = new DatagramSocket(port); this.serverSocket = new DatagramSocket(port);
System.out.println("Resolever UDP server started on port "+port);
} catch (SocketException e) { } catch (SocketException e) {
e.printStackTrace(); e.printStackTrace();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment