Merge branch 'main' into fix-5746

This commit is contained in:
Carsten Otto
2022-06-06 16:46:28 +02:00
12 changed files with 141 additions and 42 deletions

View File

@@ -8,10 +8,11 @@ import org.springframework.stereotype.Component;
import java.util.Collection;
@Component
public class GrpcMiddlewareService {
public class GrpcMiddlewareService implements ObserverIsDoneListener {
private final GrpcService grpcService;
private final Collection<RequestListener<?>> requestListeners;
private final Collection<ResponseListener<?>> responseListeners;
private boolean connected;
public GrpcMiddlewareService(
GrpcService grpcService,
@@ -24,13 +25,32 @@ public class GrpcMiddlewareService {
registerMiddleware();
}
public boolean isConnected() {
return connected;
}
@Override
public void onIsDone() {
connected = false;
sleep();
registerMiddleware();
}
private void registerMiddleware() {
connected = true;
RequestAndResponseStreamObserver requestAndResponseStreamObserver = new RequestAndResponseStreamObserver();
StreamObserver<RPCMiddlewareResponse> responseObserver =
grpcService.registerMiddleware(requestAndResponseStreamObserver);
requestAndResponseStreamObserver.initialize(responseObserver);
requestAndResponseStreamObserver.initialize(responseObserver, this);
responseListeners.forEach(requestAndResponseStreamObserver::addResponseListener);
requestListeners.forEach(requestAndResponseStreamObserver::addRequestListener);
}
private void sleep() {
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
// ignore
}
}
}

View File

@@ -0,0 +1,5 @@
package de.cotto.lndmanagej.grpc.middleware;
public interface ObserverIsDoneListener {
void onIsDone();
}

View File

@@ -27,6 +27,8 @@ class RequestAndResponseStreamObserver implements StreamObserver<RPCMiddlewareRe
@CheckForNull
private StreamObserver<RPCMiddlewareResponse> responseObserver;
@CheckForNull
private ObserverIsDoneListener observerIsDoneListener;
private final Multimap<String, RequestListener<?>> requestListeners = ArrayListMultimap.create();
private final Multimap<String, ResponseListener<?>> responseListeners = ArrayListMultimap.create();
private final ExecutorService executorService = Executors.newCachedThreadPool();
@@ -35,8 +37,12 @@ class RequestAndResponseStreamObserver implements StreamObserver<RPCMiddlewareRe
// default constructor
}
public void initialize(StreamObserver<RPCMiddlewareResponse> responseObserver) {
public void initialize(
StreamObserver<RPCMiddlewareResponse> responseObserver,
ObserverIsDoneListener observerIsDoneListener
) {
this.responseObserver = responseObserver;
this.observerIsDoneListener = observerIsDoneListener;
RPCMiddlewareResponse registrationMessage =
RPCMiddlewareResponse.newBuilder().setRegister(REGISTRATION).build();
responseObserver.onNext(registrationMessage);
@@ -54,12 +60,12 @@ class RequestAndResponseStreamObserver implements StreamObserver<RPCMiddlewareRe
@Override
public void onError(Throwable throwable) {
// ignore
Objects.requireNonNull(observerIsDoneListener).onIsDone();
}
@Override
public void onCompleted() {
// ignore
Objects.requireNonNull(observerIsDoneListener).onIsDone();
}
private void handleRequest(RPCMiddlewareRequest value) {

View File

@@ -16,6 +16,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.verify;
@@ -40,6 +41,8 @@ class GrpcMiddlewareServiceTest {
@Mock
private ResponseListener<?> responseListener;
private GrpcMiddlewareService grpcMiddlewareService;
@BeforeEach
@SuppressWarnings("unchecked")
void setUp() {
@@ -47,7 +50,8 @@ class GrpcMiddlewareServiceTest {
when(responseListener.getResponseType()).thenReturn(RESPONSE_TYPE);
ArgumentCaptor<StreamObserver<RPCMiddlewareRequest>> captor = ArgumentCaptor.forClass(StreamObserver.class);
when(grpcService.registerMiddleware(captor.capture())).thenReturn(streamResponseObserver);
new GrpcMiddlewareService(grpcService, Set.of(requestListener), Set.of(responseListener));
grpcMiddlewareService =
new GrpcMiddlewareService(grpcService, Set.of(requestListener), Set.of(responseListener));
streamRequestObserver = captor.getValue();
}
@@ -87,6 +91,11 @@ class GrpcMiddlewareServiceTest {
);
}
@Test
void isConnected() {
assertThat(grpcMiddlewareService.isConnected()).isTrue();
}
private boolean isRegistrationMessage(RPCMiddlewareResponse value) {
return value.hasRegister()
&& value.getRegister().getReadOnlyMode()

View File

@@ -15,7 +15,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -43,21 +42,26 @@ class RequestAndResponseStreamObserverTest {
@Mock
private ResponseListener<String> responseListener;
@Mock
private ObserverIsDoneListener observerIsDoneListener;
@BeforeEach
void setUp() {
lenient().when(requestListener.getRequestType()).thenReturn(REQUEST_LISTENER_TYPE);
lenient().when(responseListener.getResponseType()).thenReturn(RESPONSE_LISTENER_TYPE);
observer.initialize(responseObserver);
observer.initialize(responseObserver, observerIsDoneListener);
}
@Test
void onError() {
assertThatCode(() -> observer.onError(new NullPointerException())).doesNotThrowAnyException();
observer.onError(new NullPointerException());
verify(observerIsDoneListener).onIsDone();
}
@Test
void onCompleted() {
assertThatCode(() -> observer.onCompleted()).doesNotThrowAnyException();
observer.onCompleted();
verify(observerIsDoneListener).onIsDone();
}
@Nested

View File

@@ -21,13 +21,15 @@ public class SettledInvoices {
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
public void refresh() {
List<SettledInvoice> settledInvoices;
List<SettledInvoice> validInvoices;
do {
settledInvoices = grpcInvoices.getSettledInvoicesAfter(dao.getAddIndexOffset()).orElse(null);
if (settledInvoices == null) {
return;
}
dao.save(settledInvoices.stream().filter(SettledInvoice::isValid).toList());
} while (settledInvoices.size() == grpcInvoices.getLimit());
validInvoices = settledInvoices.stream().filter(SettledInvoice::isValid).toList();
dao.save(validInvoices);
} while (settledInvoices.size() == grpcInvoices.getLimit() && !validInvoices.isEmpty());
grpcInvoices.getNewSettledInvoicesAfter(dao.getSettleIndexOffset())
.filter(SettledInvoice::isValid)

View File

@@ -4,6 +4,7 @@ import de.cotto.lndmanagej.grpc.GrpcInvoices;
import de.cotto.lndmanagej.model.SettledInvoice;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
import org.mockito.InjectMocks;
@@ -96,6 +97,14 @@ class SettledInvoicesTest {
verify(dao).save(List.of(SETTLED_INVOICE_2));
}
@Test
@Timeout(1)
void refresh_terminates_if_there_are_only_invalid_invoices() {
when(grpcInvoices.getSettledInvoicesAfter(ADD_INDEX_OFFSET))
.thenReturn(Optional.of(List.of(SettledInvoice.INVALID)));
settledInvoices.refresh();
}
@Test
void refresh_saves_invoices_from_subscription() {
mockEmptyGetReply();

View File

@@ -3,12 +3,13 @@ package de.cotto.lndmanagej.pickhardtpayments;
import com.google.common.collect.Sets;
import de.cotto.lndmanagej.grpc.GrpcGetInfo;
import de.cotto.lndmanagej.grpc.GrpcGraph;
import de.cotto.lndmanagej.grpc.middleware.GrpcMiddlewareService;
import de.cotto.lndmanagej.model.ChannelId;
import de.cotto.lndmanagej.model.Coins;
import de.cotto.lndmanagej.model.DirectedChannelEdge;
import de.cotto.lndmanagej.model.Edge;
import de.cotto.lndmanagej.model.EdgeWithLiquidityInformation;
import de.cotto.lndmanagej.model.LocalChannel;
import de.cotto.lndmanagej.model.LocalOpenChannel;
import de.cotto.lndmanagej.model.Policy;
import de.cotto.lndmanagej.model.Pubkey;
import de.cotto.lndmanagej.pickhardtpayments.model.EdgesWithLiquidityInformation;
@@ -16,7 +17,6 @@ import de.cotto.lndmanagej.pickhardtpayments.model.PaymentOptions;
import de.cotto.lndmanagej.service.BalanceService;
import de.cotto.lndmanagej.service.ChannelService;
import de.cotto.lndmanagej.service.LiquidityBoundsService;
import de.cotto.lndmanagej.service.NodeService;
import de.cotto.lndmanagej.service.RouteHintService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,33 +34,37 @@ public class EdgeComputation {
private final GrpcGraph grpcGraph;
private final GrpcGetInfo grpcGetInfo;
private final ChannelService channelService;
private final NodeService nodeService;
private final BalanceService balanceService;
private final LiquidityBoundsService liquidityBoundsService;
private final RouteHintService routeHintService;
private final GrpcMiddlewareService grpcMiddlewareService;
public EdgeComputation(
GrpcGraph grpcGraph,
GrpcGetInfo grpcGetInfo,
ChannelService channelService,
NodeService nodeService,
BalanceService balanceService,
LiquidityBoundsService liquidityBoundsService,
RouteHintService routeHintService
RouteHintService routeHintService,
GrpcMiddlewareService grpcMiddlewareService
) {
this.grpcGraph = grpcGraph;
this.grpcGetInfo = grpcGetInfo;
this.channelService = channelService;
this.nodeService = nodeService;
this.balanceService = balanceService;
this.liquidityBoundsService = liquidityBoundsService;
this.routeHintService = routeHintService;
this.grpcMiddlewareService = grpcMiddlewareService;
}
public EdgesWithLiquidityInformation getEdges(PaymentOptions paymentOptions) {
if (noMiddlewareSupport()) {
logger.error("Middleware needs to be connected");
return EdgesWithLiquidityInformation.EMPTY;
}
Set<DirectedChannelEdge> channelEdges = grpcGraph.getChannelEdges().orElse(null);
if (channelEdges == null) {
logger.warn("Unable to get graph");
logger.error("Unable to get graph");
return EdgesWithLiquidityInformation.EMPTY;
}
Set<EdgeWithLiquidityInformation> edgesWithLiquidityInformation = new LinkedHashSet<>();
@@ -156,11 +160,11 @@ public class EdgeComputation {
}
private Optional<Coins> getLocalChannelAvailable(ChannelId channelId, Function<ChannelId, Coins> balanceProvider) {
LocalChannel localChannel = channelService.getLocalChannel(channelId).orElse(null);
LocalOpenChannel localChannel = channelService.getOpenChannel(channelId).orElse(null);
if (localChannel == null) {
return Optional.of(Coins.NONE);
}
if (nodeService.getNode(localChannel.getRemotePubkey()).online()) {
if (localChannel.getStatus().active()) {
return Optional.of(balanceProvider.apply(channelId));
}
return Optional.of(Coins.NONE);
@@ -170,4 +174,8 @@ public class EdgeComputation {
Coins upperBound = liquidityBoundsService.getAssumedLiquidityUpperBound(edge).orElse(null);
return edge.capacity().minimum(upperBound).maximum(lowerBound);
}
private boolean noMiddlewareSupport() {
return !grpcMiddlewareService.isConnected();
}
}

View File

@@ -2,11 +2,14 @@ package de.cotto.lndmanagej.pickhardtpayments;
import de.cotto.lndmanagej.grpc.GrpcGetInfo;
import de.cotto.lndmanagej.grpc.GrpcGraph;
import de.cotto.lndmanagej.grpc.middleware.GrpcMiddlewareService;
import de.cotto.lndmanagej.model.ChannelCoreInformation;
import de.cotto.lndmanagej.model.Coins;
import de.cotto.lndmanagej.model.DirectedChannelEdge;
import de.cotto.lndmanagej.model.Edge;
import de.cotto.lndmanagej.model.EdgeWithLiquidityInformation;
import de.cotto.lndmanagej.model.Node;
import de.cotto.lndmanagej.model.LocalOpenChannel;
import de.cotto.lndmanagej.model.LocalOpenChannelFixtures;
import de.cotto.lndmanagej.model.Policy;
import de.cotto.lndmanagej.model.Pubkey;
import de.cotto.lndmanagej.pickhardtpayments.model.PaymentOptions;
@@ -25,13 +28,16 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Optional;
import java.util.Set;
import static de.cotto.lndmanagej.model.BalanceInformationFixtures.BALANCE_INFORMATION;
import static de.cotto.lndmanagej.model.ChannelFixtures.CAPACITY;
import static de.cotto.lndmanagej.model.ChannelIdFixtures.CHANNEL_ID;
import static de.cotto.lndmanagej.model.ChannelIdFixtures.CHANNEL_ID_2;
import static de.cotto.lndmanagej.model.ChannelIdFixtures.CHANNEL_ID_3;
import static de.cotto.lndmanagej.model.ChannelPointFixtures.CHANNEL_POINT;
import static de.cotto.lndmanagej.model.EdgeFixtures.EDGE;
import static de.cotto.lndmanagej.model.LocalOpenChannelFixtures.LOCAL_OPEN_CHANNEL;
import static de.cotto.lndmanagej.model.NodeFixtures.NODE_PEER;
import static de.cotto.lndmanagej.model.OpenInitiator.LOCAL;
import static de.cotto.lndmanagej.model.PolicyFixtures.POLICY_1;
import static de.cotto.lndmanagej.model.PolicyFixtures.POLICY_DISABLED;
import static de.cotto.lndmanagej.model.PolicyFixtures.POLICY_WITH_BASE_FEE;
@@ -72,11 +78,15 @@ class EdgeComputationTest {
@Mock
private RouteHintService routeHintService;
@Mock
private GrpcMiddlewareService grpcMiddlewareService;
@BeforeEach
void setUp() {
lenient().when(grpcGetInfo.getPubkey()).thenReturn(PUBKEY_4);
lenient().when(nodeService.getNode(any())).thenReturn(NODE_PEER);
lenient().when(liquidityBoundsService.getAssumedLiquidityLowerBound(any())).thenReturn(Coins.NONE);
lenient().when(grpcMiddlewareService.isConnected()).thenReturn(true);
}
@Test
@@ -84,6 +94,12 @@ class EdgeComputationTest {
assertThat(edgeComputation.getEdges(DEFAULT_PAYMENT_OPTIONS).edges()).isEmpty();
}
@Test
void middleware_not_connected() {
when(grpcMiddlewareService.isConnected()).thenReturn(false);
assertThat(edgeComputation.getEdges(DEFAULT_PAYMENT_OPTIONS).edges()).isEmpty();
}
@Test
void does_not_add_edge_for_disabled_channel() {
DirectedChannelEdge edge = new DirectedChannelEdge(CHANNEL_ID, CAPACITY, PUBKEY, PUBKEY_2, POLICY_DISABLED);
@@ -163,7 +179,7 @@ class EdgeComputationTest {
void adds_liquidity_information_for_local_channel_as_source() {
mockEdge();
when(grpcGetInfo.getPubkey()).thenReturn(EDGE.startNode());
when(channelService.getLocalChannel(EDGE.channelId())).thenReturn(Optional.of(LOCAL_OPEN_CHANNEL));
when(channelService.getOpenChannel(EDGE.channelId())).thenReturn(Optional.of(LOCAL_OPEN_CHANNEL));
Coins knownLiquidity = Coins.ofSatoshis(4_567);
Coins availableKnownLiquidity = getAvailableKnownLiquidity(knownLiquidity);
when(balanceService.getAvailableLocalBalance(EDGE.channelId())).thenReturn(knownLiquidity);
@@ -201,9 +217,9 @@ class EdgeComputationTest {
}
@Test
void reduces_liquidity_to_zero_for_offline_peer_as_end_node() {
void reduces_liquidity_to_zero_for_inactive_channel_as_last_hop() {
mockEdge();
mockOfflinePeer();
mockInactiveChannel();
when(grpcGetInfo.getPubkey()).thenReturn(EDGE.startNode());
assertThat(edgeComputation.getEdges(DEFAULT_PAYMENT_OPTIONS).edges())
@@ -215,7 +231,7 @@ class EdgeComputationTest {
void adds_liquidity_information_for_local_channel_as_target() {
mockEdge();
when(grpcGetInfo.getPubkey()).thenReturn(EDGE.endNode());
when(channelService.getLocalChannel(EDGE.channelId())).thenReturn(Optional.of(LOCAL_OPEN_CHANNEL));
when(channelService.getOpenChannel(EDGE.channelId())).thenReturn(Optional.of(LOCAL_OPEN_CHANNEL));
Coins knownLiquidity = Coins.ofSatoshis(4_567);
Coins availableKnownLiquidity = getAvailableKnownLiquidity(knownLiquidity);
when(balanceService.getAvailableRemoteBalance(EDGE.channelId())).thenReturn(knownLiquidity);
@@ -233,22 +249,18 @@ class EdgeComputationTest {
.contains(EdgeWithLiquidityInformation.forKnownLiquidity(EDGE, Coins.NONE));
}
// CPD-OFF
@Test
void reduces_liquidity_to_zero_for_offline_peer_as_start_node() {
void reduces_liquidity_to_zero_for_inactive_channel_as_first_hop() {
mockEdge();
mockOfflinePeer();
mockInactiveChannel();
when(grpcGetInfo.getPubkey()).thenReturn(EDGE.endNode());
assertThat(edgeComputation.getEdges(DEFAULT_PAYMENT_OPTIONS).edges())
.contains(EdgeWithLiquidityInformation.forKnownLiquidity(EDGE, Coins.NONE));
verify(balanceService, never()).getAvailableLocalBalance(EDGE.channelId());
}
private void mockOfflinePeer() {
Pubkey remotePubkey = LOCAL_OPEN_CHANNEL.getRemotePubkey();
when(nodeService.getNode(remotePubkey)).thenReturn(new Node(remotePubkey, "", 0, false));
when(channelService.getLocalChannel(EDGE.channelId())).thenReturn(Optional.of(LOCAL_OPEN_CHANNEL));
}
// CPD-ON
@Test
void adds_upper_bound_from_liquidity_bounds_service() {
@@ -278,18 +290,26 @@ class EdgeComputationTest {
when(grpcGetInfo.getPubkey()).thenReturn(PUBKEY);
Coins knownLiquidity = Coins.ofSatoshis(4_567);
Coins availableKnownLiquidity = getAvailableKnownLiquidity(knownLiquidity);
when(channelService.getLocalChannel(EDGE.channelId())).thenReturn(Optional.of(LOCAL_OPEN_CHANNEL));
when(channelService.getOpenChannel(EDGE.channelId())).thenReturn(Optional.of(LOCAL_OPEN_CHANNEL));
when(balanceService.getAvailableLocalBalance(EDGE.channelId())).thenReturn(knownLiquidity);
assertThat(edgeComputation.getEdgeWithLiquidityInformation(EDGE))
.isEqualTo(EdgeWithLiquidityInformation.forKnownLiquidity(EDGE, availableKnownLiquidity));
}
@Test
void getEdgeWithLiquidityInformation_first_node_is_own_node_but_channel_is_inactive() {
mockInactiveChannel();
when(grpcGetInfo.getPubkey()).thenReturn(PUBKEY);
assertThat(edgeComputation.getEdgeWithLiquidityInformation(EDGE))
.isEqualTo(EdgeWithLiquidityInformation.forKnownLiquidity(EDGE, Coins.NONE));
}
@Test
void getEdgeWithLiquidityInformation_second_node_is_own_node() {
when(grpcGetInfo.getPubkey()).thenReturn(PUBKEY_2);
Coins knownLiquidity = Coins.ofSatoshis(4_567);
Coins availableKnownLiquidity = getAvailableKnownLiquidity(knownLiquidity);
when(channelService.getLocalChannel(EDGE.channelId())).thenReturn(Optional.of(LOCAL_OPEN_CHANNEL));
when(channelService.getOpenChannel(EDGE.channelId())).thenReturn(Optional.of(LOCAL_OPEN_CHANNEL));
when(balanceService.getAvailableRemoteBalance(EDGE.channelId())).thenReturn(knownLiquidity);
assertThat(edgeComputation.getEdgeWithLiquidityInformation(EDGE))
.isEqualTo(EdgeWithLiquidityInformation.forKnownLiquidity(EDGE, availableKnownLiquidity));
@@ -369,4 +389,20 @@ class EdgeComputationTest {
private static Policy policy(int feeRate) {
return new Policy(feeRate, Coins.NONE, true, 40, Coins.ofSatoshis(0));
}
private void mockInactiveChannel() {
LocalOpenChannel inactiveChannel = new LocalOpenChannel(
new ChannelCoreInformation(CHANNEL_ID, CHANNEL_POINT, CAPACITY),
PUBKEY,
PUBKEY_2,
BALANCE_INFORMATION,
LOCAL,
LocalOpenChannelFixtures.TOTAL_SENT,
LocalOpenChannelFixtures.TOTAL_RECEIVED,
false,
false,
LocalOpenChannelFixtures.NUM_UPDATES
);
when(channelService.getOpenChannel(EDGE.channelId())).thenReturn(Optional.of(inactiveChannel));
}
}

View File

@@ -41,8 +41,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
@SuppressWarnings("CPD-START")
@Import({ObjectMapperConfiguration.class, PaymentStatusStream.class})
@WebMvcTest(controllers = PickhardtPaymentsController.class)
class PickhardtPaymentsControllerIT {
@WebMvcTest(controllers = PaymentsController.class)
class PaymentsControllerIT {
private static final String PREFIX = "/api/payments";
private static final String PAYMENT_REQUEST = "xxx";
private static final PaymentOptions PAYMENT_OPTIONS = new PaymentOptions(

View File

@@ -25,7 +25,7 @@ import static org.springframework.http.MediaType.APPLICATION_NDJSON;
@RestController
@RequestMapping("/api/payments/")
public class PickhardtPaymentsController {
public class PaymentsController {
private static final PaymentOptionsDto PAYMENT_OPTIONS_DTO = PaymentOptionsDto.DEFAULT;
private final MultiPathPaymentSplitter multiPathPaymentSplitter;
@@ -34,7 +34,7 @@ public class PickhardtPaymentsController {
private final TopUpService topUpService;
private final GraphService graphService;
public PickhardtPaymentsController(
public PaymentsController(
MultiPathPaymentSplitter multiPathPaymentSplitter,
MultiPathPaymentSender multiPathPaymentSender,
PaymentStatusStream paymentStatusStream,

View File

@@ -32,7 +32,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class PickhardtPaymentsControllerTest {
class PaymentsControllerTest {
private static final String PAYMENT_REQUEST = "xxx";
private static final String STREAM_RESPONSE = "beep beep boop!";
@@ -55,7 +55,7 @@ class PickhardtPaymentsControllerTest {
}
@InjectMocks
private PickhardtPaymentsController controller;
private PaymentsController controller;
@Mock
private MultiPathPaymentSplitter multiPathPaymentSplitter;