From 7d5fff124529ea78282b8be94fc49943a6b4cdcd Mon Sep 17 00:00:00 2001 From: Carsten Otto Date: Mon, 11 Apr 2022 00:14:56 +0200 Subject: [PATCH] notify listeners in background --- grpc-adapter/build.gradle | 3 +- .../RequestAndResponseStreamObserver.java | 10 +++++-- .../middleware/GrpcMiddlewareServiceTest.java | 10 +++++-- .../RequestAndResponseStreamObserverTest.java | 30 +++++++++++++------ 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/grpc-adapter/build.gradle b/grpc-adapter/build.gradle index 9585eeb5..c5c2d598 100644 --- a/grpc-adapter/build.gradle +++ b/grpc-adapter/build.gradle @@ -7,6 +7,7 @@ dependencies { implementation project(':grpc-client') implementation project(':hardcoded') implementation project(':model') + testImplementation 'org.awaitility:awaitility:4.1.1' testImplementation testFixtures(project(':model')) } @@ -26,4 +27,4 @@ jacocoTestCoverageVerification { } } } -} \ No newline at end of file +} diff --git a/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/middleware/RequestAndResponseStreamObserver.java b/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/middleware/RequestAndResponseStreamObserver.java index 5836663d..e98e301a 100644 --- a/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/middleware/RequestAndResponseStreamObserver.java +++ b/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/middleware/RequestAndResponseStreamObserver.java @@ -11,6 +11,8 @@ import lnrpc.RPCMiddlewareResponse; import javax.annotation.CheckForNull; import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; class RequestAndResponseStreamObserver implements StreamObserver { private static final String MIDDLEWARE_NAME = "lnd-manageJ"; @@ -27,6 +29,7 @@ class RequestAndResponseStreamObserver implements StreamObserver responseObserver; private final Multimap> requestListeners = ArrayListMultimap.create(); private final Multimap> responseListeners = ArrayListMultimap.create(); + private final ExecutorService executorService = Executors.newCachedThreadPool(); public RequestAndResponseStreamObserver() { // default constructor @@ -40,10 +43,13 @@ class RequestAndResponseStreamObserver implements StreamObserver { + handleRequest(value); + handleResponse(value); + }); } @Override diff --git a/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/middleware/GrpcMiddlewareServiceTest.java b/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/middleware/GrpcMiddlewareServiceTest.java index 3468dcf6..3d873655 100644 --- a/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/middleware/GrpcMiddlewareServiceTest.java +++ b/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/middleware/GrpcMiddlewareServiceTest.java @@ -14,7 +14,9 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import java.util.Set; +import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -68,7 +70,9 @@ class GrpcMiddlewareServiceTest { RPCMiddlewareRequest message = RPCMiddlewareRequest.newBuilder().setMsgId(123).setRequestId(456).setRequest(request).build(); streamRequestObserver.onNext(message); - verify(requestListener).acceptRequest(expectedPayload, 456); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> verify(requestListener).acceptRequest(expectedPayload, 456) + ); } @Test @@ -78,7 +82,9 @@ class GrpcMiddlewareServiceTest { RPCMiddlewareRequest message = RPCMiddlewareRequest.newBuilder().setMsgId(123).setRequestId(456).setResponse(response).build(); streamRequestObserver.onNext(message); - verify(responseListener).acceptResponse(expectedPayload, 456); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> verify(responseListener).acceptResponse(expectedPayload, 456) + ); } private boolean isRegistrationMessage(RPCMiddlewareResponse value) { diff --git a/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/middleware/RequestAndResponseStreamObserverTest.java b/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/middleware/RequestAndResponseStreamObserverTest.java index ccdf70e7..3f5ac601 100644 --- a/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/middleware/RequestAndResponseStreamObserverTest.java +++ b/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/middleware/RequestAndResponseStreamObserverTest.java @@ -13,8 +13,10 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.util.concurrent.TimeUnit; + import static org.assertj.core.api.Assertions.assertThatCode; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doThrow; @@ -73,14 +75,18 @@ class RequestAndResponseStreamObserverTest { long messageId = MESSAGE_ID; String payload = "request-payload"; observer.onNext(createRequestMessage(REQUEST_LISTENER_TYPE, messageId, payload)); - verify(requestListener).acceptRequest(ByteString.copyFromUtf8(payload), messageId); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> verify(requestListener).acceptRequest(ByteString.copyFromUtf8(payload), messageId) + ); } @Test void ignores_listener_with_other_type_for_request() { observer.addRequestListener(requestListener); observer.onNext(createRequestMessage("not-listener-type", MESSAGE_ID, "xxx")); - verify(requestListener, never()).acceptRequest(any(ByteString.class), anyLong()); + await().atLeast(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> verify(requestListener, never()).acceptRequest(any(ByteString.class), anyLong()) + ); } @Test @@ -89,31 +95,37 @@ class RequestAndResponseStreamObserverTest { long messageId = 456; String payload = "response-payload"; observer.onNext(createResponseMessage(RESPONSE_LISTENER_TYPE, messageId, payload)); - verify(responseListener).acceptResponse(ByteString.copyFromUtf8(payload), messageId); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> verify(responseListener).acceptResponse(ByteString.copyFromUtf8(payload), messageId) + ); } @Test void ignores_listener_with_other_type_for_response() { observer.addResponseListener(responseListener); observer.onNext(createResponseMessage("not-listener-type", 456, "yyy")); - verify(responseListener, never()).acceptResponse(any(ByteString.class), anyLong()); + await().atLeast(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> verify(responseListener, never()).acceptResponse(any(ByteString.class), anyLong()) + ); } @Test void acknowledges_message() { observer.addResponseListener(responseListener); observer.onNext(createResponseMessage("foo", 100, "zzz")); - verify(responseObserver).onNext(ackMessage()); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> verify(responseObserver).onNext(ackMessage()) + ); } @Test void acknowledges_message_also_if_listener_crashes() { observer.addResponseListener(responseListener); doThrow(new NullPointerException()).when(responseListener).acceptResponse(any(ByteString.class), anyLong()); - assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> - observer.onNext(createResponseMessage(RESPONSE_LISTENER_TYPE, 100, "crash")) + observer.onNext(createResponseMessage(RESPONSE_LISTENER_TYPE, 100, "crash")); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> verify(responseObserver).onNext(ackMessage()) ); - verify(responseObserver).onNext(ackMessage()); } private RPCMiddlewareResponse ackMessage() {