diff --git a/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/GrpcInvoices.java b/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/GrpcInvoices.java index 1bc19869..b15c86bb 100644 --- a/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/GrpcInvoices.java +++ b/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/GrpcInvoices.java @@ -9,8 +9,12 @@ import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.HexFormat; +import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; @Component public class GrpcInvoices { @@ -23,6 +27,10 @@ public class GrpcInvoices { this.grpcService = grpcService; } + public int getLimit() { + return LIMIT; + } + public Optional> getSettledInvoicesAfter(long offset) { ListInvoiceResponse list = grpcService.getInvoices(offset, LIMIT).orElse(null); if (list == null) { @@ -33,6 +41,14 @@ public class GrpcInvoices { .toList()); } + public Stream getNewSettledInvoicesAfter(long settleIndexOffset) { + return grpcService.subscribeToSettledInvoices(settleIndexOffset) + .map(this::toStream) + .orElse(Stream.of()) + .map(this::toSettledInvoice) + .filter(SettledInvoice::isValid); + } + private SettledInvoice toSettledInvoice(Invoice lndInvoice) { if (lndInvoice.getState() != Invoice.InvoiceState.SETTLED) { return SettledInvoice.INVALID; @@ -47,7 +63,7 @@ public class GrpcInvoices { ); } - public int getLimit() { - return LIMIT; + private Stream toStream(Iterator iterator) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false); } } diff --git a/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/GrpcService.java b/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/GrpcService.java index 45c1fa0a..88a1e22c 100644 --- a/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/GrpcService.java +++ b/grpc-adapter/src/main/java/de/cotto/lndmanagej/grpc/GrpcService.java @@ -17,6 +17,8 @@ import lnrpc.ForwardingHistoryRequest; import lnrpc.ForwardingHistoryResponse; import lnrpc.GetInfoResponse; import lnrpc.GetTransactionsRequest; +import lnrpc.Invoice; +import lnrpc.InvoiceSubscription; import lnrpc.LightningGrpc; import lnrpc.ListChannelsRequest; import lnrpc.ListInvoiceRequest; @@ -34,6 +36,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.io.IOException; import java.time.Duration; +import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -155,6 +158,13 @@ public class GrpcService extends GrpcBase { return get(() -> lightningStub.listInvoices(request)); } + @Timed + public Optional> subscribeToSettledInvoices(long settleIndex) { + return get(() -> lightningStub.subscribeInvoices(InvoiceSubscription.newBuilder() + .setSettleIndex(settleIndex) + .build())); + } + @Timed public List listPeersWithoutCache() { return get(() -> lightningStub.listPeers(ListPeersRequest.getDefaultInstance()).getPeersList()) diff --git a/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/GrpcInvoicesTest.java b/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/GrpcInvoicesTest.java index b40034d6..741bbfa8 100644 --- a/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/GrpcInvoicesTest.java +++ b/grpc-adapter/src/test/java/de/cotto/lndmanagej/grpc/GrpcInvoicesTest.java @@ -4,6 +4,7 @@ import com.google.protobuf.ByteString; import de.cotto.lndmanagej.model.SettledInvoice; import lnrpc.Invoice; import lnrpc.ListInvoiceResponse; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -12,6 +13,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import javax.annotation.Nullable; import java.time.ZoneOffset; +import java.util.Collections; import java.util.HexFormat; import java.util.List; import java.util.Optional; @@ -19,6 +21,7 @@ import java.util.Optional; import static de.cotto.lndmanagej.model.SettledInvoiceFixtures.SETTLED_INVOICE; import static de.cotto.lndmanagej.model.SettledInvoiceFixtures.SETTLED_INVOICE_2; import static lnrpc.Invoice.InvoiceState.ACCEPTED; +import static lnrpc.Invoice.InvoiceState.CANCELED; import static lnrpc.Invoice.InvoiceState.OPEN; import static lnrpc.Invoice.InvoiceState.SETTLED; import static org.assertj.core.api.Assertions.assertThat; @@ -30,7 +33,6 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class GrpcInvoicesTest { - private static final long OFFSET = 123; private static final int LIMIT = 1_000; private static final HexFormat HEX_FORMAT = HexFormat.of(); @@ -40,53 +42,108 @@ class GrpcInvoicesTest { @Mock private GrpcService grpcService; - @Test - void empty_optional() { - when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.empty()); - assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).isEmpty(); + @Nested + class BulkGet { + private static final long ADD_INDEX_OFFSET = 123; + + @Test + void empty_optional() { + when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.empty()); + assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).isEmpty(); + } + + @Test + void no_invoice() { + ListInvoiceResponse response = ListInvoiceResponse.newBuilder().build(); + when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response)); + assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains(List.of()); + } + + @Test + void with_events() { + ListInvoiceResponse response = ListInvoiceResponse.newBuilder() + .addInvoices(invoice(SETTLED, SETTLED_INVOICE)) + .addInvoices(invoice(SETTLED, SETTLED_INVOICE_2)) + .build(); + when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response)); + assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains( + List.of(SETTLED_INVOICE, SETTLED_INVOICE_2) + ); + } + + @Test + void returns_non_settled_invoices_as_invalid() { + ListInvoiceResponse response = ListInvoiceResponse.newBuilder() + .addInvoices(invoice(ACCEPTED, null)) + .addInvoices(invoice(OPEN, null)) + .build(); + when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response)); + assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains( + List.of(SettledInvoice.INVALID, SettledInvoice.INVALID) + ); + } + + @Test + void starts_at_the_beginning() { + grpcInvoices.getSettledInvoicesAfter(ADD_INDEX_OFFSET); + verify(grpcService).getInvoices(eq(ADD_INDEX_OFFSET), anyInt()); + } + + @Test + void uses_limit() { + grpcInvoices.getSettledInvoicesAfter(ADD_INDEX_OFFSET); + verify(grpcService).getInvoices(ADD_INDEX_OFFSET, LIMIT); + } } - @Test - void no_invoice() { - ListInvoiceResponse response = ListInvoiceResponse.newBuilder().build(); - when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response)); - assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains(List.of()); - } + @Nested + class Subscription { + private static final long SETTLE_INDEX_OFFSET = 999; - @Test - void with_events() { - ListInvoiceResponse response = ListInvoiceResponse.newBuilder() - .addInvoices(invoice(SETTLED, SETTLED_INVOICE)) - .addInvoices(invoice(SETTLED, SETTLED_INVOICE_2)) - .build(); - when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response)); - assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains( - List.of(SETTLED_INVOICE, SETTLED_INVOICE_2) - ); - } + @Test + void empty() { + when(grpcService.subscribeToSettledInvoices(anyLong())).thenReturn(Optional.empty()); + assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET)).isEmpty(); + } - @Test - void returns_non_settled_invoices_as_invalid() { - ListInvoiceResponse response = ListInvoiceResponse.newBuilder() - .addInvoices(invoice(ACCEPTED, null)) - .addInvoices(invoice(OPEN, null)) - .build(); - when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response)); - assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains( - List.of(SettledInvoice.INVALID, SettledInvoice.INVALID) - ); - } + @Test + void one_invoice() { + when(grpcService.subscribeToSettledInvoices(SETTLE_INDEX_OFFSET)) + .thenReturn(Optional.of(List.of(invoice(SETTLED, SETTLED_INVOICE)).iterator())); + assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET)).containsExactly(SETTLED_INVOICE); + } - @Test - void starts_at_the_beginning() { - grpcInvoices.getSettledInvoicesAfter(OFFSET); - verify(grpcService).getInvoices(eq(OFFSET), anyInt()); - } + @Test + void two_invoices() { + List invoices = List.of( + invoice(SETTLED, SETTLED_INVOICE), + invoice(SETTLED, SETTLED_INVOICE_2) + ); + when(grpcService.subscribeToSettledInvoices(SETTLE_INDEX_OFFSET)) + .thenReturn(Optional.of(invoices.iterator())); + assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET)) + .containsExactly(SETTLED_INVOICE, SETTLED_INVOICE_2); + } - @Test - void uses_limit() { - grpcInvoices.getSettledInvoicesAfter(OFFSET); - verify(grpcService).getInvoices(OFFSET, LIMIT); + @Test + void ignores_invalid_invoice() { + List invoices = List.of( + invoice(SETTLED, SETTLED_INVOICE_2), + invoice(CANCELED, null), + invoice(SETTLED, SETTLED_INVOICE) + ); + when(grpcService.subscribeToSettledInvoices(SETTLE_INDEX_OFFSET)) + .thenReturn(Optional.of(invoices.iterator())); + assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET)) + .containsExactly(SETTLED_INVOICE_2, SETTLED_INVOICE); + } + + @Test + void empty_iterator() { + when(grpcService.subscribeToSettledInvoices(SETTLE_INDEX_OFFSET)) + .thenReturn(Optional.of(Collections.emptyIterator())); + assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET)).isEmpty(); + } } @Test diff --git a/invoices/src/integrationTest/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesRepositoryIT.java b/invoices/src/integrationTest/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesRepositoryIT.java index 9e9f1bbb..4b43c831 100644 --- a/invoices/src/integrationTest/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesRepositoryIT.java +++ b/invoices/src/integrationTest/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesRepositoryIT.java @@ -15,6 +15,21 @@ class SettledInvoicesRepositoryIT { @Autowired private SettledInvoicesRepository repository; + @Test + void getMaxSettledIndex_no_invoice() { + assertThat(repository.getMaxSettledIndex()).isEqualTo(0); + } + + @Test + void getMaxSettledIndex_with_gaps_in_addIndex() { + repository.save(invoice(1, 1)); + repository.save(invoice(2, 2)); + repository.save(invoice(5, 3)); + repository.save(invoice(6, 5)); + repository.save(invoice(7, 4)); + assertThat(repository.getMaxSettledIndex()).isEqualTo(5); + } + @Test void getMaxAddIndexWithoutGaps_no_invoice() { assertThat(repository.getMaxAddIndexWithoutGaps()).isEqualTo(0); diff --git a/invoices/src/main/java/de/cotto/lndmanagej/invoices/SettledInvoices.java b/invoices/src/main/java/de/cotto/lndmanagej/invoices/SettledInvoices.java index 9a36d267..034162eb 100644 --- a/invoices/src/main/java/de/cotto/lndmanagej/invoices/SettledInvoices.java +++ b/invoices/src/main/java/de/cotto/lndmanagej/invoices/SettledInvoices.java @@ -24,8 +24,12 @@ public class SettledInvoices { public void refresh() { List settledInvoices; do { - settledInvoices = grpcInvoices.getSettledInvoicesAfter(dao.getOffset()).orElse(List.of()); + settledInvoices = grpcInvoices.getSettledInvoicesAfter(dao.getAddIndexOffset()).orElse(List.of()); dao.save(settledInvoices.stream().filter(SettledInvoice::isValid).collect(toList())); } while (settledInvoices.size() == grpcInvoices.getLimit()); + + grpcInvoices.getNewSettledInvoicesAfter(dao.getSettleIndexOffset()) + .filter(SettledInvoice::isValid) + .forEach(dao::save); } } diff --git a/invoices/src/main/java/de/cotto/lndmanagej/invoices/SettledInvoicesDao.java b/invoices/src/main/java/de/cotto/lndmanagej/invoices/SettledInvoicesDao.java index d469d534..c0c41359 100644 --- a/invoices/src/main/java/de/cotto/lndmanagej/invoices/SettledInvoicesDao.java +++ b/invoices/src/main/java/de/cotto/lndmanagej/invoices/SettledInvoicesDao.java @@ -7,5 +7,9 @@ import java.util.Collection; public interface SettledInvoicesDao { void save(Collection settledInvoices); - long getOffset(); + void save(SettledInvoice settledInvoice); + + long getAddIndexOffset(); + + long getSettleIndexOffset(); } diff --git a/invoices/src/main/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesDaoImpl.java b/invoices/src/main/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesDaoImpl.java index 659ff150..9696da11 100644 --- a/invoices/src/main/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesDaoImpl.java +++ b/invoices/src/main/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesDaoImpl.java @@ -28,7 +28,17 @@ public class SettledInvoicesDaoImpl implements SettledInvoicesDao { } @Override - public long getOffset() { + public void save(SettledInvoice settledInvoice) { + repository.save(SettledInvoiceJpaDto.createFromInvoice(settledInvoice)); + } + + @Override + public long getAddIndexOffset() { return repository.getMaxAddIndexWithoutGaps(); } + + @Override + public long getSettleIndexOffset() { + return repository.getMaxSettledIndex(); + } } diff --git a/invoices/src/main/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesRepository.java b/invoices/src/main/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesRepository.java index f3cca9cb..ffbe1356 100644 --- a/invoices/src/main/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesRepository.java +++ b/invoices/src/main/java/de/cotto/lndmanagej/invoices/persistence/SettledInvoicesRepository.java @@ -7,4 +7,7 @@ public interface SettledInvoicesRepository extends JpaRepository jpaDto.getMemo().equals(SETTLED_INVOICE.memo()))); } @Test