simple conversation compatibility for client-server interoperations

This commit is contained in:
bvn13 2022-01-28 15:19:16 +03:00
parent 2dd4b39603
commit c93b36cc7f
8 changed files with 466 additions and 0 deletions

View File

@ -1,5 +1,6 @@
plugins { plugins {
id 'java' id 'java'
id 'idea'
} }
group 'me.bvn13' group 'me.bvn13'
@ -10,10 +11,21 @@ repositories {
} }
dependencies { dependencies {
// https://mvnrepository.com/artifact/org.slf4j/slf4j-api
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.35'
testImplementation 'ch.qos.logback:logback-classic:1.2.10'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.8.1'
} }
test { test {
useJUnitPlatform() useJUnitPlatform()
}
tasks.withType(Test).configureEach {
maxParallelForks = 1
} }

View File

@ -0,0 +1,74 @@
/*
Copyright 2020 Vyacheslav Boyko
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package me.bvn13.sewy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public abstract class AbstractClientListener implements Runnable {
protected static final Logger log = LoggerFactory.getLogger(AbstractClientListener.class);
protected final Socket socket;
protected PrintWriter out;
protected BufferedReader in;
protected AbstractClientListener(Socket socket) {
this.socket = socket;
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
out = new PrintWriter(socket.getOutputStream(), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public String readLine() {
try {
return in.readLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void writeLine(String data) {
out.println(data);
}
void stop() {
out.close();
try {
in.close();
} catch (IOException e) {
log.warn("Unable to close IN client buffer");
}
try {
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,98 @@
/*
Copyright 2020 Vyacheslav Boyko
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package me.bvn13.sewy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.lang.String.format;
public class Client {
private static final Logger log = LoggerFactory.getLogger(Client.class);
private final ExecutorService executor = Executors.newCachedThreadPool();
private Socket socket;
private PrintWriter out;
private BufferedReader in;
public Client() {
}
public Client(String host, int port) {
connect(host, port);
}
public void stop() {
out.close();
try {
in.close();
} catch (IOException e) {
log.warn("Unable to close IN client buffer");
}
try {
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void connect(String host, int port) {
try {
socket = new Socket(host, port);
} catch (IOException e) {
log.error(format("Error while conversation with %s:%d", host, port), e);
stop();
return;
}
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
out = new PrintWriter(socket.getOutputStream(), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public String readLine() {
try {
return in.readLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void writeLine(String data) {
out.println(data);
}
boolean isConnected() {
return socket.isConnected();
}
}

View File

@ -0,0 +1,14 @@
package me.bvn13.sewy;
import java.net.Socket;
public class DefaultClientListener extends AbstractClientListener {
public DefaultClientListener(Socket socket) {
super(socket);
}
@Override
public void run() {
}
}

View File

@ -0,0 +1,142 @@
/*
Copyright 2020 Vyacheslav Boyko
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package me.bvn13.sewy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import static java.lang.String.format;
public class Server {
private static final Logger log = LoggerFactory.getLogger(Server.class);
private final ExecutorService executor = Executors.newCachedThreadPool();
final List<AbstractClientListener> clients = Collections.synchronizedList(new ArrayList<>());
private ServerSocket socket;
public Server(String host, int port) {
this(host, port, DefaultClientListener.class);
}
// @SuppressWarnings("unchecked")
// public Server(String host, int port, Class clientListenerClass) {
//
// if (clientListenerClass.getGenericSuperclass() == null
// || !clientListenerClass.getGenericSuperclass().equals(AbstractClientListener.class)) {
// throw new IllegalArgumentException("Wrong client listener of type: "+clientListenerClass.getName());
// }
//
// executor.execute(() -> {
// try (final ServerSocket server = new ServerSocket(port, 0, InetAddress.getByName(host))) {
//
// socket = server;
//
// while (!server.isClosed()) {
// final Socket client = server.accept();
// final Constructor<AbstractClientListener> constructor = clientListenerClass.getDeclaredConstructor(Socket.class);
// constructor.setAccessible(true);
// final AbstractClientListener clientListener = constructor.newInstance(client);
// executor.execute(clientListener);
// clients.add(clientListener);
// }
//
// } catch (IOException e) {
// log.error(format("Error while conversation with %s:%d", host, port), e);
// } catch (Exception e) {
// log.error(format("Unable to instantiate %s", clientListenerClass.getName()), e);
// }
// });
//
// }
@SuppressWarnings("unchecked")
public Server(String host, int port, Class clientListenerClass) {
this(host, port, defaultClientListenerConstructor(clientListenerClass));
}
@SuppressWarnings("unchecked")
public Server(String host, int port, Function<Socket, AbstractClientListener> clientListenerConstructor) {
executor.execute(() -> {
try (final ServerSocket server = new ServerSocket(port, 0, InetAddress.getByName(host))) {
socket = server;
while (!server.isClosed()) {
final Socket client = server.accept();
final AbstractClientListener clientListener = clientListenerConstructor.apply(client);
executor.execute(clientListener);
clients.add(clientListener);
}
} catch (IOException e) {
log.error(format("Error while conversation with %s:%d", host, port), e);
}
});
}
public void stop() {
final Iterator<AbstractClientListener> iterator = clients.iterator();
while (iterator.hasNext()) {
final AbstractClientListener client = iterator.next();
client.stop();
iterator.remove();
}
executor.shutdown();
}
boolean isListening() {
return socket != null && socket.isBound();
}
@SuppressWarnings("unchecked")
private static Function<Socket, AbstractClientListener> defaultClientListenerConstructor(Class clientListenerClass) {
if (clientListenerClass.getGenericSuperclass() == null
|| !clientListenerClass.getGenericSuperclass().equals(AbstractClientListener.class)) {
throw new IllegalArgumentException("Wrong client listener of type: "+clientListenerClass.getName());
}
return (client) -> {
try {
final Constructor<AbstractClientListener> constructor = clientListenerClass.getDeclaredConstructor(Socket.class);
constructor.setAccessible(true);
return constructor.newInstance(client);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
}
}

View File

@ -0,0 +1,48 @@
/*
Copyright 2020 Vyacheslav Boyko
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package me.bvn13.sewy;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
public final class Sewy {
private static Sewy INSTANCE;
private static final ReentrantLock LOCK = new ReentrantLock();
private final List<Serializable> registeredDataTypes = new CopyOnWriteArrayList<>();
public static void register(Serializable clazz) {
getInstance().registeredDataTypes.add(clazz);
}
private Sewy() {}
private static Sewy getInstance() {
try {
LOCK.lock();
if (INSTANCE == null) {
INSTANCE = new Sewy();
}
return INSTANCE;
} finally {
LOCK.unlock();
}
}
}

View File

@ -0,0 +1,14 @@
package me.bvn13.sewy;
import java.net.Socket;
public class EchoClientListener extends AbstractClientListener {
public EchoClientListener(Socket socket) {
super(socket);
}
@Override
public void run() {
writeLine(readLine());
}
}

View File

@ -0,0 +1,64 @@
package me.bvn13.sewy;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
public class ServerTest {
private static final int START_PORT = 12345;
@ParameterizedTest
@ValueSource(ints = START_PORT + 1)
void testServerStarts(int port) throws InterruptedException {
Server server = new Server("localhost", port);
Thread.sleep(1000);
Assertions.assertTrue(server.isListening());
server.stop();
}
@ParameterizedTest
@ValueSource(ints = START_PORT + 2)
void givenServerRunning_whenClientConnects_thenServerCanStopClientListener(int port) throws InterruptedException {
Server server = new Server("localhost", port);
Client client = new Client("localhost", port);
Thread.sleep(1000);
Assertions.assertTrue(server.isListening());
Assertions.assertTrue(client.isConnected());
server.stop();
client.stop();
}
@ParameterizedTest
@ValueSource(ints = START_PORT + 3)
void failedToStartServerWithBadClientListener(int port) {
Assertions.assertThrows(RuntimeException.class, () -> {
new Server("localhost", port, Object.class);
}, "Wrong client listener");
}
@ParameterizedTest
@ValueSource(ints = START_PORT + 4)
void serverStartedWithLambdaProvidedClientListener(int port) throws InterruptedException {
Server server = new Server("localhost", port, (socket) -> new AbstractClientListener(socket) {
@Override
public void run() {
}
});
Thread.sleep(1000);
Assertions.assertTrue(server.isListening());
server.stop();
}
@ParameterizedTest
@ValueSource(ints = START_PORT + 5)
void simpleEchoClientServer(int port) {
new Server("localhost", port, EchoClientListener.class);
Client client = new Client("localhost", port);
client.writeLine("hello");
String response = client.readLine();
Assertions.assertEquals("hello", response);
}
}