From 98fc7703f42a51b3ab247a8f715a3437469a8412 Mon Sep 17 00:00:00 2001 From: bvn13 Date: Mon, 21 Feb 2022 02:07:35 +0300 Subject: [PATCH] many fixes --- build.gradle | 13 +++- .../me/bvn13/sewy/AbstractClientListener.java | 76 +++++++++---------- src/main/java/me/bvn13/sewy/Client.java | 8 +- .../me/bvn13/sewy/ClientListenerFactory.java | 8 +- .../java/me/bvn13/sewy/CommandClient.java | 19 +++-- .../me/bvn13/sewy/CommandClientListener.java | 64 +++++++++------- .../java/me/bvn13/sewy/CommandServer.java | 57 ++++++++------ src/main/java/me/bvn13/sewy/Server.java | 46 ++++++++--- .../me/bvn13/sewy/EchoClientListener.java | 14 +++- src/test/java/me/bvn13/sewy/ServerTest.java | 4 +- 10 files changed, 189 insertions(+), 120 deletions(-) diff --git a/build.gradle b/build.gradle index 620c137..0675aa4 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ plugins { } group 'me.bvn13' -version '1.2.4' +version '1.2.5' repositories { mavenCentral() @@ -34,6 +34,10 @@ tasks.withType(Test).configureEach { maxParallelForks = 1 } +task sourceJar(type: Jar) { + from sourceSets.main.allJava +} + publishing { repositories { maven { @@ -49,5 +53,12 @@ publishing { gpr(MavenPublication) { from(components.java) } + mavenJava(MavenPublication) { + from components.java + + artifact sourceJar { + classifier "sources" + } + } } } \ No newline at end of file diff --git a/src/main/java/me/bvn13/sewy/AbstractClientListener.java b/src/main/java/me/bvn13/sewy/AbstractClientListener.java index 4ef729e..f407886 100644 --- a/src/main/java/me/bvn13/sewy/AbstractClientListener.java +++ b/src/main/java/me/bvn13/sewy/AbstractClientListener.java @@ -47,7 +47,7 @@ public abstract class AbstractClientListener implements Runnable { this.in = socket.getInputStream(); log.debug("BufferedReader successfully created"); log.debug("PrintWriter successfully created"); - out = socket.getOutputStream(); + this.out = socket.getOutputStream(); log.debug("OutputStream successfully created"); } catch (IOException e) { throw new RuntimeException(e); @@ -66,7 +66,7 @@ public abstract class AbstractClientListener implements Runnable { * * @return the line read from socket */ - public String readLine() { + public String readLine() throws IOException { final byte[] bytes = readBytes(getSeparator()); final StringBuilder sb = new StringBuilder(); for (byte aByte : bytes) { @@ -83,40 +83,36 @@ public abstract class AbstractClientListener implements Runnable { * @param separator byte to separate data portions * @return array of bytes read from socket */ - public byte[] readBytes(byte[] separator) { + public byte[] readBytes(byte[] separator) throws IOException { final List data = new ArrayList<>(2048 * 2048); List buffer = new ArrayList<>(separator.length); int separatorPosition = 0; - try { - while (socket.isConnected()) { - byte[] portion = in.readNBytes(1); - if (portion == null || portion.length == 0) { + while (socket.isConnected() && !socket.isClosed()) { + byte[] portion = in.readNBytes(1); + if (portion == null || portion.length == 0) { + break; + } + if (portion[0] == separator[separatorPosition]) { + if (separatorPosition == separator.length - 1) { break; } - if (portion[0] == separator[separatorPosition]) { - if (separatorPosition == separator.length - 1) { - break; - } - separatorPosition++; - buffer.add(portion[0]); - continue; - } else { - separatorPosition = 0; - data.addAll(buffer); - buffer.clear(); - } - data.add(portion[0]); + separatorPosition++; + buffer.add(portion[0]); + continue; + } else { + separatorPosition = 0; + data.addAll(buffer); + buffer.clear(); } - final byte[] bytes = new byte[data.size()]; - int i = 0; - for (Byte aByte : data) { - bytes[i++] = aByte; - } - if (log.isTraceEnabled()) log.trace("Received {} bytes: {}", bytes.length, bytes); - return bytes; - } catch (IOException e) { - throw new RuntimeException(e); + data.add(portion[0]); } + final byte[] bytes = new byte[data.size()]; + int i = 0; + for (Byte aByte : data) { + bytes[i++] = aByte; + } + if (log.isTraceEnabled()) log.trace("Received {} bytes: {}", bytes.length, bytes); + return bytes; } /** @@ -126,15 +122,11 @@ public abstract class AbstractClientListener implements Runnable { * @param bytes bytes to be sent into socket * @param separator byte to separate data portions */ - public void writeBytes(byte[] bytes, byte[] separator) { + public void writeBytes(byte[] bytes, byte[] separator) throws IOException { if (log.isTraceEnabled()) log.trace("Sending {} bytes: {}", bytes.length, bytes); - try { - out.write(bytes); - out.write(separator); - out.flush(); - } catch (IOException e) { - throw new RuntimeException(e); - } + out.write(bytes); + out.write(separator); + out.flush(); } /** @@ -145,7 +137,11 @@ public abstract class AbstractClientListener implements Runnable { */ public void writeLine(String data) { if (log.isTraceEnabled()) log.trace("Sending: " + data); - writeBytes(data.getBytes(), getSeparator()); + try { + writeBytes(data.getBytes(), getSeparator()); + } catch (Exception e) { + log.error("", e); + } } /** @@ -157,12 +153,12 @@ public abstract class AbstractClientListener implements Runnable { out.close(); in.close(); } catch (IOException e) { - log.warn("Unable to close IN client buffer"); + log.warn("Unable to close IN/OUT client buffer"); } try { socket.close(); } catch (IOException e) { - throw new RuntimeException(e); + log.warn("Unable to close socket"); } } } diff --git a/src/main/java/me/bvn13/sewy/Client.java b/src/main/java/me/bvn13/sewy/Client.java index be36bfe..bc53134 100644 --- a/src/main/java/me/bvn13/sewy/Client.java +++ b/src/main/java/me/bvn13/sewy/Client.java @@ -115,7 +115,11 @@ public class Client { * @return the line read from socket */ public String readLine() { - return client.readLine(); + try { + return client.readLine(); + } catch (Exception e) { + throw new RuntimeException(e); + } } /** @@ -123,7 +127,7 @@ public class Client { * @param separator * @return */ - public byte[] readBytes(byte[] separator) { + public byte[] readBytes(byte[] separator) throws IOException { return client.readBytes(separator); } diff --git a/src/main/java/me/bvn13/sewy/ClientListenerFactory.java b/src/main/java/me/bvn13/sewy/ClientListenerFactory.java index 7ddf448..f8fc22b 100644 --- a/src/main/java/me/bvn13/sewy/ClientListenerFactory.java +++ b/src/main/java/me/bvn13/sewy/ClientListenerFactory.java @@ -26,16 +26,16 @@ class ClientListenerFactory { /** * Creates client listener constructor + * * @param clientListenerClass class to be used as client listener - * @param generic type + * @param generic type * @return lambda method to create client listener */ @SuppressWarnings("unchecked") static Function createClientListenerConstructor(Class clientListenerClass) { - if (clientListenerClass.getGenericSuperclass() == null - /*|| !clientListenerClass.getGenericSuperclass().equals(T.class)*/) { - throw new IllegalArgumentException("Wrong client listener of type: "+clientListenerClass.getName()); + if (clientListenerClass.getGenericSuperclass() == null) { + throw new IllegalArgumentException("Wrong client listener of type: " + clientListenerClass.getName()); } return (client) -> { diff --git a/src/main/java/me/bvn13/sewy/CommandClient.java b/src/main/java/me/bvn13/sewy/CommandClient.java index cfc57e4..9bead6b 100644 --- a/src/main/java/me/bvn13/sewy/CommandClient.java +++ b/src/main/java/me/bvn13/sewy/CommandClient.java @@ -19,10 +19,10 @@ import me.bvn13.sewy.command.AbstractCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.Socket; import java.util.function.Function; -import static java.lang.String.format; import static me.bvn13.sewy.ClientListenerFactory.createClientListenerConstructor; /** @@ -41,6 +41,7 @@ public class CommandClient extends Client { /** * Starts to connect to server immediately + * * @param host host to connect to * @param port port to be used while connecting */ @@ -50,8 +51,9 @@ public class CommandClient extends Client { /** * Starts to connect to server immediately - * @param host host to connect to - * @param port port to be used while connecting + * + * @param host host to connect to + * @param port port to be used while connecting * @param clientListenerClass client listener class describing protocol of communications */ public CommandClient(String host, int port, Class clientListenerClass) { @@ -60,8 +62,9 @@ public class CommandClient extends Client { /** * Connects to server immediately - * @param host host to connect to - * @param port port to be used while connecting + * + * @param host host to connect to + * @param port port to be used while connecting * @param clientListenerConstructor to provide constructor for client listener (see {@link me.bvn13.sewy.Client#Client(java.lang.String, int, java.lang.Class)}) */ public CommandClient(String host, int port, Function clientListenerConstructor) { @@ -71,10 +74,12 @@ public class CommandClient extends Client { /** * Sends command to server + * * @param command command to be sent - * @param generic type + * @param generic type + * @throws IOException if any error occurred while sending */ - public void send(T command) { + public void send(T command) throws IOException { log.debug("Start to send command: " + command); client.send(command); } diff --git a/src/main/java/me/bvn13/sewy/CommandClientListener.java b/src/main/java/me/bvn13/sewy/CommandClientListener.java index 9445094..a557bb9 100644 --- a/src/main/java/me/bvn13/sewy/CommandClientListener.java +++ b/src/main/java/me/bvn13/sewy/CommandClientListener.java @@ -18,6 +18,7 @@ package me.bvn13.sewy; import me.bvn13.sewy.command.AbstractCommand; import org.apache.commons.lang3.SerializationUtils; +import java.io.IOException; import java.io.Serializable; import java.net.Socket; @@ -39,34 +40,38 @@ public class CommandClientListener extends AbstractClientListener implements Abs public void run() { for (Thread.yield(); !socket.isConnected() && !socket.isClosed(); Thread.yield()) { } - while (socket.isConnected()) { - Thread.yield(); - byte[] line = readBytes(getSeparator()); - if (line == null || line.length == 0) { - continue; - } - final Object command; + while (socket.isConnected() && !socket.isClosed()) { try { - command = SerializationUtils.deserialize(line); - } catch (Throwable e) { - log.warn("Deserialization exception occurred!", e); - continue; + Thread.yield(); + byte[] line = readBytes(getSeparator()); + if (line == null || line.length == 0) { + continue; + } + final Object command; + try { + command = SerializationUtils.deserialize(line); + } catch (Throwable e) { + log.warn("Deserialization exception occurred!", e); + continue; + } + if (command == null) { + continue; + } + if (!Sewy.getRegisteredDataTypes().contains(command.getClass())) { + log.error("Unexpected command received"); + continue; + } + log.debug("Command received: " + command.getClass()); + if (!(command instanceof AbstractCommand)) { + log.warn("Incorrect command received: " + command); + continue; + } + final Serializable response = onCommand((AbstractCommand) command); + log.debug(format("Response for %s is: %s", command, response)); + writeBytes(SerializationUtils.serialize(response), getSeparator()); + } catch (Exception e) { + log.error("Failed to communicate!", e); } - if (command == null) { - continue; - } - if (!Sewy.getRegisteredDataTypes().contains(command.getClass())) { - log.error("Unexpected command received"); - continue; - } - log.debug("Command received: " + command.getClass()); - if (!(command instanceof AbstractCommand)) { - log.warn("Incorrect command received: " + command); - continue; - } - final Serializable response = onCommand((AbstractCommand) command); - log.debug(format("Response for %s is: %s", command, response)); - writeBytes(SerializationUtils.serialize(response), getSeparator()); } } @@ -85,11 +90,12 @@ public class CommandClientListener extends AbstractClientListener implements Abs /** * Sends command to opposite side + * * @param command command to be sent - * @param generic type + * @param generic type */ - public void send(T command) { - log.debug("Start to send command: " + command); + public void send(T command) throws IOException { + log.debug("Start to send command: {}", command); writeBytes(SerializationUtils.serialize(command), getSeparator()); } diff --git a/src/main/java/me/bvn13/sewy/CommandServer.java b/src/main/java/me/bvn13/sewy/CommandServer.java index ac7cad8..27727c5 100644 --- a/src/main/java/me/bvn13/sewy/CommandServer.java +++ b/src/main/java/me/bvn13/sewy/CommandServer.java @@ -16,19 +16,12 @@ package me.bvn13.sewy; import me.bvn13.sewy.command.AbstractCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.function.Consumer; import java.util.function.Function; import static java.lang.String.format; @@ -50,8 +43,8 @@ public class CommandServer extends Server { } /** - * @param host host to bind in order to start listen to clients - * @param port port to start listen to + * @param host host to bind in order to start listen to clients + * @param port port to start listen to * @param clientListenerClass client listen class to be used for communication */ public CommandServer(String host, int port, Class clientListenerClass) { @@ -59,9 +52,8 @@ public class CommandServer extends Server { } /** - * - * @param host host to bind in order to start listen to clients - * @param port port to start listen to + * @param host host to bind in order to start listen to clients + * @param port port to start listen to * @param clientListenerConstructor to provide constructor for client listener (see {@link CommandServer#CommandServer(String, int, Class)}) */ @SuppressWarnings("unchecked") @@ -73,10 +65,12 @@ public class CommandServer extends Server { socket = server; while (!server.isClosed()) { - final Socket client = server.accept(); - final CommandClientListener clientListener = clientListenerConstructor.apply(client); - executor.execute(clientListener); - clients.add(clientListener); + if (!isMaximumClientsAchieved()) { + final Socket client = server.accept(); + final CommandClientListener clientListener = clientListenerConstructor.apply(client); + executor.execute(clientListener); + clients.add(clientListener); + } } } catch (IOException e) { @@ -87,14 +81,31 @@ public class CommandServer extends Server { /** * Sends command to every client - * @param command command to be sent - * @param generic type + * + * @param command command to be sent + * @param generic type */ public void send(T command) { - log.debug("Start to send command: " + command); - for (CommandClientListener client : clients) { - client.send(command); - } + send(command, client -> {}); } + /** + * Sends command to every client + * + * @param command command to be sent + * @param generic type + * @param onException for catching errors while sending. Do not throw any Exception inside onException callback - + * it leads to stopping sending the command to remaining clients + */ + public void send(T command, Consumer onException) { + log.debug("Start to send command: " + command); + for (CommandClientListener client : clients) { + try { + client.send(command); + } catch (IOException e) { + log.error("Failed to send command " + command, e); + onException.accept(client); + } + } + } } diff --git a/src/main/java/me/bvn13/sewy/Server.java b/src/main/java/me/bvn13/sewy/Server.java index c60c1fb..0176222 100644 --- a/src/main/java/me/bvn13/sewy/Server.java +++ b/src/main/java/me/bvn13/sewy/Server.java @@ -46,12 +46,14 @@ public class Server { protected ServerSocket socket; + private int maxClientsCount; + protected Server() { } /** - * @param host host to bind in order to start listen to clients - * @param port port to start listen to + * @param host host to bind in order to start listen to clients + * @param port port to start listen to * @param clientListenerClass client listen class to be used for communication */ public Server(String host, int port, Class clientListenerClass) { @@ -59,9 +61,8 @@ public class Server { } /** - * - * @param host host to bind in order to start listen to clients - * @param port port to start listen to + * @param host host to bind in order to start listen to clients + * @param port port to start listen to * @param clientListenerConstructor to provide constructor for client listener (see {@link me.bvn13.sewy.Server#Server(java.lang.String, int, java.lang.Class)}) */ @SuppressWarnings("unchecked") @@ -73,17 +74,19 @@ public class Server { socket = server; while (!server.isClosed()) { - final Socket client = server.accept(); - final T clientListener = clientListenerConstructor.apply(client); - executor.execute(clientListener); - clients.add(clientListener); + if (!isMaximumClientsAchieved()) { + final Socket client = server.accept(); + final T clientListener = clientListenerConstructor.apply(client); + executor.execute(clientListener); + clients.add(clientListener); + } + Thread.yield(); } } catch (IOException e) { log.error(format("Error while conversation with %s:%d", host, port), e); } }); - } /** @@ -110,10 +113,33 @@ public class Server { /** * To check whether the server is ready for new connections + * * @return */ public boolean isListening() { return socket != null && socket.isBound(); } + /** + * Returns count of connected clients + * + * @return count of connected clients + */ + public int getClientsCount() { + return clients.size(); + } + + /** + * Sets maximum clients to be connected to server + * + * @param count maximum clients count + */ + public void setMaxClientsCount(int count) { + maxClientsCount = count; + } + + protected boolean isMaximumClientsAchieved() { + return maxClientsCount == 0 + || clients.size() >= maxClientsCount; + } } diff --git a/src/test/java/me/bvn13/sewy/EchoClientListener.java b/src/test/java/me/bvn13/sewy/EchoClientListener.java index 0d29080..595e228 100644 --- a/src/test/java/me/bvn13/sewy/EchoClientListener.java +++ b/src/test/java/me/bvn13/sewy/EchoClientListener.java @@ -15,6 +15,9 @@ */ package me.bvn13.sewy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.Socket; /** @@ -22,6 +25,9 @@ import java.net.Socket; * Writes into socket all the data received before */ public class EchoClientListener extends AbstractClientListener { + + private final Logger log = LoggerFactory.getLogger(EchoClientListener.class); + public EchoClientListener(Socket socket) { super(socket); } @@ -33,8 +39,12 @@ public class EchoClientListener extends AbstractClientListener { public void run() { while (socket.isConnected()) { Thread.yield(); - final String data = readLine(); - writeLine(data); + try { + final String data = readLine(); + writeLine(data); + } catch (Exception e) { + log.error("", e); + } } } } diff --git a/src/test/java/me/bvn13/sewy/ServerTest.java b/src/test/java/me/bvn13/sewy/ServerTest.java index 3ebbcd1..7d14823 100644 --- a/src/test/java/me/bvn13/sewy/ServerTest.java +++ b/src/test/java/me/bvn13/sewy/ServerTest.java @@ -83,7 +83,7 @@ public class ServerTest { @ParameterizedTest @ValueSource(ints = START_PORT + 6) - void serverIsAbleToPingPong(int port) throws InterruptedException { + void serverIsAbleToPingPong(int port) throws Exception { Sewy.register(PingCommand.class); Sewy.register(PongCommand.class); @@ -116,7 +116,7 @@ public class ServerTest { @ParameterizedTest @ValueSource(ints = START_PORT + 7) - void wideSeparatorTest(int port) throws InterruptedException { + void wideSeparatorTest(int port) throws Exception { Sewy.register(ComplexCommand.class); Sewy.setSeparator(new byte[] { '\n', 'e', 'n', 'd', '\n' });