notify listeners in background

This commit is contained in:
Carsten Otto
2022-04-11 00:14:56 +02:00
parent 9fbdcfc590
commit 7d5fff1245
4 changed files with 39 additions and 14 deletions

View File

@@ -7,6 +7,7 @@ dependencies {
implementation project(':grpc-client')
implementation project(':hardcoded')
implementation project(':model')
testImplementation 'org.awaitility:awaitility:4.1.1'
testImplementation testFixtures(project(':model'))
}
@@ -26,4 +27,4 @@ jacocoTestCoverageVerification {
}
}
}
}
}

View File

@@ -11,6 +11,8 @@ import lnrpc.RPCMiddlewareResponse;
import javax.annotation.CheckForNull;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class RequestAndResponseStreamObserver implements StreamObserver<RPCMiddlewareRequest> {
private static final String MIDDLEWARE_NAME = "lnd-manageJ";
@@ -27,6 +29,7 @@ class RequestAndResponseStreamObserver implements StreamObserver<RPCMiddlewareRe
private StreamObserver<RPCMiddlewareResponse> responseObserver;
private final Multimap<String, RequestListener<?>> requestListeners = ArrayListMultimap.create();
private final Multimap<String, ResponseListener<?>> responseListeners = ArrayListMultimap.create();
private final ExecutorService executorService = Executors.newCachedThreadPool();
public RequestAndResponseStreamObserver() {
// default constructor
@@ -40,10 +43,13 @@ class RequestAndResponseStreamObserver implements StreamObserver<RPCMiddlewareRe
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onNext(RPCMiddlewareRequest value) {
respondWithDoNotReplace(value.getMsgId());
handleRequest(value);
handleResponse(value);
executorService.submit(() -> {
handleRequest(value);
handleResponse(value);
});
}
@Override

View File

@@ -14,7 +14,9 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -68,7 +70,9 @@ class GrpcMiddlewareServiceTest {
RPCMiddlewareRequest message =
RPCMiddlewareRequest.newBuilder().setMsgId(123).setRequestId(456).setRequest(request).build();
streamRequestObserver.onNext(message);
verify(requestListener).acceptRequest(expectedPayload, 456);
await().atMost(1, TimeUnit.SECONDS).untilAsserted(
() -> verify(requestListener).acceptRequest(expectedPayload, 456)
);
}
@Test
@@ -78,7 +82,9 @@ class GrpcMiddlewareServiceTest {
RPCMiddlewareRequest message =
RPCMiddlewareRequest.newBuilder().setMsgId(123).setRequestId(456).setResponse(response).build();
streamRequestObserver.onNext(message);
verify(responseListener).acceptResponse(expectedPayload, 456);
await().atMost(1, TimeUnit.SECONDS).untilAsserted(
() -> verify(responseListener).acceptResponse(expectedPayload, 456)
);
}
private boolean isRegistrationMessage(RPCMiddlewareResponse value) {

View File

@@ -13,8 +13,10 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;
@@ -73,14 +75,18 @@ class RequestAndResponseStreamObserverTest {
long messageId = MESSAGE_ID;
String payload = "request-payload";
observer.onNext(createRequestMessage(REQUEST_LISTENER_TYPE, messageId, payload));
verify(requestListener).acceptRequest(ByteString.copyFromUtf8(payload), messageId);
await().atMost(1, TimeUnit.SECONDS).untilAsserted(
() -> verify(requestListener).acceptRequest(ByteString.copyFromUtf8(payload), messageId)
);
}
@Test
void ignores_listener_with_other_type_for_request() {
observer.addRequestListener(requestListener);
observer.onNext(createRequestMessage("not-listener-type", MESSAGE_ID, "xxx"));
verify(requestListener, never()).acceptRequest(any(ByteString.class), anyLong());
await().atLeast(100, TimeUnit.MILLISECONDS).untilAsserted(
() -> verify(requestListener, never()).acceptRequest(any(ByteString.class), anyLong())
);
}
@Test
@@ -89,31 +95,37 @@ class RequestAndResponseStreamObserverTest {
long messageId = 456;
String payload = "response-payload";
observer.onNext(createResponseMessage(RESPONSE_LISTENER_TYPE, messageId, payload));
verify(responseListener).acceptResponse(ByteString.copyFromUtf8(payload), messageId);
await().atMost(1, TimeUnit.SECONDS).untilAsserted(
() -> verify(responseListener).acceptResponse(ByteString.copyFromUtf8(payload), messageId)
);
}
@Test
void ignores_listener_with_other_type_for_response() {
observer.addResponseListener(responseListener);
observer.onNext(createResponseMessage("not-listener-type", 456, "yyy"));
verify(responseListener, never()).acceptResponse(any(ByteString.class), anyLong());
await().atLeast(100, TimeUnit.MILLISECONDS).untilAsserted(
() -> verify(responseListener, never()).acceptResponse(any(ByteString.class), anyLong())
);
}
@Test
void acknowledges_message() {
observer.addResponseListener(responseListener);
observer.onNext(createResponseMessage("foo", 100, "zzz"));
verify(responseObserver).onNext(ackMessage());
await().atMost(1, TimeUnit.SECONDS).untilAsserted(
() -> verify(responseObserver).onNext(ackMessage())
);
}
@Test
void acknowledges_message_also_if_listener_crashes() {
observer.addResponseListener(responseListener);
doThrow(new NullPointerException()).when(responseListener).acceptResponse(any(ByteString.class), anyLong());
assertThatExceptionOfType(NullPointerException.class).isThrownBy(() ->
observer.onNext(createResponseMessage(RESPONSE_LISTENER_TYPE, 100, "crash"))
observer.onNext(createResponseMessage(RESPONSE_LISTENER_TYPE, 100, "crash"));
await().atMost(1, TimeUnit.SECONDS).untilAsserted(
() -> verify(responseObserver).onNext(ackMessage())
);
verify(responseObserver).onNext(ackMessage());
}
private RPCMiddlewareResponse ackMessage() {