Adds reconnect

This commit is contained in:
Thomas Philipona
2018-10-08 16:03:34 +02:00
parent 05ee7eb56e
commit 28cf1a75a4

View File

@@ -1,13 +1,22 @@
package ch.puzzle.lnd.websocketbridge; package ch.puzzle.lnd.websocketbridge;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler; import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient; import org.springframework.web.socket.messaging.WebSocketStompClient;
@@ -15,10 +24,6 @@ import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport; import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport; import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Component @Component
public class WebsocketBridge implements Runnable, DisposableBean { public class WebsocketBridge implements Runnable, DisposableBean {
@@ -32,8 +37,9 @@ public class WebsocketBridge implements Runnable, DisposableBean {
private static String OPTION_ARG_TOPIC = "topic"; private static String OPTION_ARG_TOPIC = "topic";
private static String OPTION_ARG_COMMAND = "command"; private static String OPTION_ARG_COMMAND = "command";
private String url = DEFAULT_URL;
private final ApplicationArguments args; private String topic = DEFAULT_TOPIC;
private String command = DEFAULT_COMMAND;
private Thread thread; private Thread thread;
@@ -41,14 +47,14 @@ public class WebsocketBridge implements Runnable, DisposableBean {
@Autowired @Autowired
public WebsocketBridge(ApplicationArguments args) { public WebsocketBridge(ApplicationArguments args) {
this.args = args;
initArguments(args);
this.thread = new Thread(this); this.thread = new Thread(this);
this.thread.start(); this.thread.start();
} }
@Override private void initArguments(ApplicationArguments args) {
public void run() {
logger.info("Application started with command-line arguments: {}", Arrays.toString(args.getSourceArgs())); logger.info("Application started with command-line arguments: {}", Arrays.toString(args.getSourceArgs()));
logger.info("NonOptionArgs: {}", args.getNonOptionArgs()); logger.info("NonOptionArgs: {}", args.getNonOptionArgs());
logger.info("OptionNames: {}", args.getOptionNames()); logger.info("OptionNames: {}", args.getOptionNames());
@@ -57,9 +63,6 @@ public class WebsocketBridge implements Runnable, DisposableBean {
logger.info("NonOptionArgs: {}", args.getNonOptionArgs()); logger.info("NonOptionArgs: {}", args.getNonOptionArgs());
logger.info("OptionNames: {}", args.getOptionNames()); logger.info("OptionNames: {}", args.getOptionNames());
String url = DEFAULT_URL;
String topic = DEFAULT_TOPIC;
String command = DEFAULT_COMMAND;
if (args.getOptionValues(OPTION_ARG_URL) != null && !args.getOptionValues(OPTION_ARG_URL).equals("")) { if (args.getOptionValues(OPTION_ARG_URL) != null && !args.getOptionValues(OPTION_ARG_URL).equals("")) {
url = args.getOptionValues(OPTION_ARG_URL).get(0); url = args.getOptionValues(OPTION_ARG_URL).get(0);
} }
@@ -71,19 +74,50 @@ public class WebsocketBridge implements Runnable, DisposableBean {
if (args.getOptionValues(OPTION_ARG_COMMAND) != null && !args.getOptionValues(OPTION_ARG_COMMAND).equals("")) { if (args.getOptionValues(OPTION_ARG_COMMAND) != null && !args.getOptionValues(OPTION_ARG_COMMAND).equals("")) {
command = args.getOptionValues(OPTION_ARG_COMMAND).get(0); command = args.getOptionValues(OPTION_ARG_COMMAND).get(0);
} }
}
private ListenableFuture<StompSession> connect() throws InterruptedException{
List<Transport> transports = new ArrayList<>(1); List<Transport> transports = new ArrayList<>(1);
transports.add(new WebSocketTransport(new StandardWebSocketClient())); transports.add(new WebSocketTransport(new StandardWebSocketClient()));
WebSocketClient transport = new SockJsClient(transports); WebSocketClient transport = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(transport); WebSocketStompClient stompClient = new WebSocketStompClient(transport);
stompClient.setMessageConverter(new MappingJackson2MessageConverter()); stompClient.setMessageConverter(new MappingJackson2MessageConverter());
StompSessionHandler sessionHandler = new MyStompSessionHandler(topic, command); StompSessionHandler sessionHandler = new MyStompSessionHandler(topic, command);
stompClient.connect(url, sessionHandler);
ListenableFuture<StompSession> session = null;
boolean connected = false;
while(!connected) {
try {
Thread.sleep(5000);
session = stompClient.connect(url, sessionHandler);
session.get().isConnected();
connected = true;
}catch(HttpServerErrorException | ExecutionException e) {
logger.info("Connection Error", e);
// do try again
}
}
return session;
}
@Override
public void run() {
ListenableFuture<StompSession> session = null;
while (keepRunning) { while (keepRunning) {
try { try {
Thread.sleep(5000); Thread.sleep(5000);
} catch (InterruptedException e) { if(session == null || !session.get().isConnected()) {
logger.info("is not connected! Try reconnect");
session = connect();
}
} catch (InterruptedException | ExecutionException e) {
keepRunning = false; keepRunning = false;
} }
} }