subscribe to new invoices after loading all in bulk

This commit is contained in:
Carsten Otto
2021-12-02 22:21:53 +01:00
parent ec1ac9344d
commit 3363e782ac
10 changed files with 227 additions and 57 deletions

View File

@@ -9,8 +9,12 @@ import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HexFormat;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@Component
public class GrpcInvoices {
@@ -23,6 +27,10 @@ public class GrpcInvoices {
this.grpcService = grpcService;
}
public int getLimit() {
return LIMIT;
}
public Optional<List<SettledInvoice>> getSettledInvoicesAfter(long offset) {
ListInvoiceResponse list = grpcService.getInvoices(offset, LIMIT).orElse(null);
if (list == null) {
@@ -33,6 +41,14 @@ public class GrpcInvoices {
.toList());
}
public Stream<SettledInvoice> getNewSettledInvoicesAfter(long settleIndexOffset) {
return grpcService.subscribeToSettledInvoices(settleIndexOffset)
.map(this::toStream)
.orElse(Stream.of())
.map(this::toSettledInvoice)
.filter(SettledInvoice::isValid);
}
private SettledInvoice toSettledInvoice(Invoice lndInvoice) {
if (lndInvoice.getState() != Invoice.InvoiceState.SETTLED) {
return SettledInvoice.INVALID;
@@ -47,7 +63,7 @@ public class GrpcInvoices {
);
}
public int getLimit() {
return LIMIT;
private Stream<Invoice> toStream(Iterator<Invoice> iterator) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);
}
}

View File

@@ -17,6 +17,8 @@ import lnrpc.ForwardingHistoryRequest;
import lnrpc.ForwardingHistoryResponse;
import lnrpc.GetInfoResponse;
import lnrpc.GetTransactionsRequest;
import lnrpc.Invoice;
import lnrpc.InvoiceSubscription;
import lnrpc.LightningGrpc;
import lnrpc.ListChannelsRequest;
import lnrpc.ListInvoiceRequest;
@@ -34,6 +36,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -155,6 +158,13 @@ public class GrpcService extends GrpcBase {
return get(() -> lightningStub.listInvoices(request));
}
@Timed
public Optional<Iterator<Invoice>> subscribeToSettledInvoices(long settleIndex) {
return get(() -> lightningStub.subscribeInvoices(InvoiceSubscription.newBuilder()
.setSettleIndex(settleIndex)
.build()));
}
@Timed
public List<Peer> listPeersWithoutCache() {
return get(() -> lightningStub.listPeers(ListPeersRequest.getDefaultInstance()).getPeersList())

View File

@@ -4,6 +4,7 @@ import com.google.protobuf.ByteString;
import de.cotto.lndmanagej.model.SettledInvoice;
import lnrpc.Invoice;
import lnrpc.ListInvoiceResponse;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
@@ -12,6 +13,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import javax.annotation.Nullable;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HexFormat;
import java.util.List;
import java.util.Optional;
@@ -19,6 +21,7 @@ import java.util.Optional;
import static de.cotto.lndmanagej.model.SettledInvoiceFixtures.SETTLED_INVOICE;
import static de.cotto.lndmanagej.model.SettledInvoiceFixtures.SETTLED_INVOICE_2;
import static lnrpc.Invoice.InvoiceState.ACCEPTED;
import static lnrpc.Invoice.InvoiceState.CANCELED;
import static lnrpc.Invoice.InvoiceState.OPEN;
import static lnrpc.Invoice.InvoiceState.SETTLED;
import static org.assertj.core.api.Assertions.assertThat;
@@ -30,7 +33,6 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class GrpcInvoicesTest {
private static final long OFFSET = 123;
private static final int LIMIT = 1_000;
private static final HexFormat HEX_FORMAT = HexFormat.of();
@@ -40,53 +42,108 @@ class GrpcInvoicesTest {
@Mock
private GrpcService grpcService;
@Test
void empty_optional() {
when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.empty());
assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).isEmpty();
@Nested
class BulkGet {
private static final long ADD_INDEX_OFFSET = 123;
@Test
void empty_optional() {
when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.empty());
assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).isEmpty();
}
@Test
void no_invoice() {
ListInvoiceResponse response = ListInvoiceResponse.newBuilder().build();
when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response));
assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains(List.of());
}
@Test
void with_events() {
ListInvoiceResponse response = ListInvoiceResponse.newBuilder()
.addInvoices(invoice(SETTLED, SETTLED_INVOICE))
.addInvoices(invoice(SETTLED, SETTLED_INVOICE_2))
.build();
when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response));
assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains(
List.of(SETTLED_INVOICE, SETTLED_INVOICE_2)
);
}
@Test
void returns_non_settled_invoices_as_invalid() {
ListInvoiceResponse response = ListInvoiceResponse.newBuilder()
.addInvoices(invoice(ACCEPTED, null))
.addInvoices(invoice(OPEN, null))
.build();
when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response));
assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains(
List.of(SettledInvoice.INVALID, SettledInvoice.INVALID)
);
}
@Test
void starts_at_the_beginning() {
grpcInvoices.getSettledInvoicesAfter(ADD_INDEX_OFFSET);
verify(grpcService).getInvoices(eq(ADD_INDEX_OFFSET), anyInt());
}
@Test
void uses_limit() {
grpcInvoices.getSettledInvoicesAfter(ADD_INDEX_OFFSET);
verify(grpcService).getInvoices(ADD_INDEX_OFFSET, LIMIT);
}
}
@Test
void no_invoice() {
ListInvoiceResponse response = ListInvoiceResponse.newBuilder().build();
when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response));
assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains(List.of());
}
@Nested
class Subscription {
private static final long SETTLE_INDEX_OFFSET = 999;
@Test
void with_events() {
ListInvoiceResponse response = ListInvoiceResponse.newBuilder()
.addInvoices(invoice(SETTLED, SETTLED_INVOICE))
.addInvoices(invoice(SETTLED, SETTLED_INVOICE_2))
.build();
when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response));
assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains(
List.of(SETTLED_INVOICE, SETTLED_INVOICE_2)
);
}
@Test
void empty() {
when(grpcService.subscribeToSettledInvoices(anyLong())).thenReturn(Optional.empty());
assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET)).isEmpty();
}
@Test
void returns_non_settled_invoices_as_invalid() {
ListInvoiceResponse response = ListInvoiceResponse.newBuilder()
.addInvoices(invoice(ACCEPTED, null))
.addInvoices(invoice(OPEN, null))
.build();
when(grpcService.getInvoices(anyLong(), anyInt())).thenReturn(Optional.of(response));
assertThat(grpcInvoices.getSettledInvoicesAfter(0L)).contains(
List.of(SettledInvoice.INVALID, SettledInvoice.INVALID)
);
}
@Test
void one_invoice() {
when(grpcService.subscribeToSettledInvoices(SETTLE_INDEX_OFFSET))
.thenReturn(Optional.of(List.of(invoice(SETTLED, SETTLED_INVOICE)).iterator()));
assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET)).containsExactly(SETTLED_INVOICE);
}
@Test
void starts_at_the_beginning() {
grpcInvoices.getSettledInvoicesAfter(OFFSET);
verify(grpcService).getInvoices(eq(OFFSET), anyInt());
}
@Test
void two_invoices() {
List<Invoice> invoices = List.of(
invoice(SETTLED, SETTLED_INVOICE),
invoice(SETTLED, SETTLED_INVOICE_2)
);
when(grpcService.subscribeToSettledInvoices(SETTLE_INDEX_OFFSET))
.thenReturn(Optional.of(invoices.iterator()));
assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET))
.containsExactly(SETTLED_INVOICE, SETTLED_INVOICE_2);
}
@Test
void uses_limit() {
grpcInvoices.getSettledInvoicesAfter(OFFSET);
verify(grpcService).getInvoices(OFFSET, LIMIT);
@Test
void ignores_invalid_invoice() {
List<Invoice> invoices = List.of(
invoice(SETTLED, SETTLED_INVOICE_2),
invoice(CANCELED, null),
invoice(SETTLED, SETTLED_INVOICE)
);
when(grpcService.subscribeToSettledInvoices(SETTLE_INDEX_OFFSET))
.thenReturn(Optional.of(invoices.iterator()));
assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET))
.containsExactly(SETTLED_INVOICE_2, SETTLED_INVOICE);
}
@Test
void empty_iterator() {
when(grpcService.subscribeToSettledInvoices(SETTLE_INDEX_OFFSET))
.thenReturn(Optional.of(Collections.emptyIterator()));
assertThat(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET)).isEmpty();
}
}
@Test

View File

@@ -15,6 +15,21 @@ class SettledInvoicesRepositoryIT {
@Autowired
private SettledInvoicesRepository repository;
@Test
void getMaxSettledIndex_no_invoice() {
assertThat(repository.getMaxSettledIndex()).isEqualTo(0);
}
@Test
void getMaxSettledIndex_with_gaps_in_addIndex() {
repository.save(invoice(1, 1));
repository.save(invoice(2, 2));
repository.save(invoice(5, 3));
repository.save(invoice(6, 5));
repository.save(invoice(7, 4));
assertThat(repository.getMaxSettledIndex()).isEqualTo(5);
}
@Test
void getMaxAddIndexWithoutGaps_no_invoice() {
assertThat(repository.getMaxAddIndexWithoutGaps()).isEqualTo(0);

View File

@@ -24,8 +24,12 @@ public class SettledInvoices {
public void refresh() {
List<SettledInvoice> settledInvoices;
do {
settledInvoices = grpcInvoices.getSettledInvoicesAfter(dao.getOffset()).orElse(List.of());
settledInvoices = grpcInvoices.getSettledInvoicesAfter(dao.getAddIndexOffset()).orElse(List.of());
dao.save(settledInvoices.stream().filter(SettledInvoice::isValid).collect(toList()));
} while (settledInvoices.size() == grpcInvoices.getLimit());
grpcInvoices.getNewSettledInvoicesAfter(dao.getSettleIndexOffset())
.filter(SettledInvoice::isValid)
.forEach(dao::save);
}
}

View File

@@ -7,5 +7,9 @@ import java.util.Collection;
public interface SettledInvoicesDao {
void save(Collection<SettledInvoice> settledInvoices);
long getOffset();
void save(SettledInvoice settledInvoice);
long getAddIndexOffset();
long getSettleIndexOffset();
}

View File

@@ -28,7 +28,17 @@ public class SettledInvoicesDaoImpl implements SettledInvoicesDao {
}
@Override
public long getOffset() {
public void save(SettledInvoice settledInvoice) {
repository.save(SettledInvoiceJpaDto.createFromInvoice(settledInvoice));
}
@Override
public long getAddIndexOffset() {
return repository.getMaxAddIndexWithoutGaps();
}
@Override
public long getSettleIndexOffset() {
return repository.getMaxSettledIndex();
}
}

View File

@@ -7,4 +7,7 @@ public interface SettledInvoicesRepository extends JpaRepository<SettledInvoiceJ
@Query("SELECT coalesce(max(i.addIndex), 0) FROM SettledInvoiceJpaDto i WHERE " +
"i.settleIndex = (SELECT COUNT(j) FROM SettledInvoiceJpaDto j WHERE j.settleIndex <= i.settleIndex)")
long getMaxAddIndexWithoutGaps();
@Query("SELECT coalesce(max(settleIndex), 0) FROM SettledInvoiceJpaDto")
long getMaxSettledIndex();
}

View File

@@ -5,21 +5,27 @@ import de.cotto.lndmanagej.model.SettledInvoice;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import static de.cotto.lndmanagej.model.SettledInvoiceFixtures.SETTLED_INVOICE;
import static de.cotto.lndmanagej.model.SettledInvoiceFixtures.SETTLED_INVOICE_2;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class SettledInvoicesTest {
private static final long OFFSET = 1_234;
private static final long ADD_INDEX_OFFSET = 1_234;
private static final long SETTLE_INDEX_OFFSET = 999;
@InjectMocks
private SettledInvoices settledInvoices;
@@ -32,19 +38,20 @@ class SettledInvoicesTest {
@BeforeEach
void setUp() {
when(dao.getOffset()).thenReturn(OFFSET);
when(dao.getAddIndexOffset()).thenReturn(ADD_INDEX_OFFSET);
when(dao.getSettleIndexOffset()).thenReturn(SETTLE_INDEX_OFFSET);
when(grpcInvoices.getLimit()).thenReturn(1);
}
@Test
void refresh_uses_known_offset() {
settledInvoices.refresh();
verify(grpcInvoices).getSettledInvoicesAfter(OFFSET);
verify(grpcInvoices).getSettledInvoicesAfter(ADD_INDEX_OFFSET);
}
@Test
void refresh_saves_invoices() {
when(grpcInvoices.getSettledInvoicesAfter(OFFSET)).thenReturn(
when(grpcInvoices.getSettledInvoicesAfter(ADD_INDEX_OFFSET)).thenReturn(
Optional.of(List.of(SETTLED_INVOICE, SETTLED_INVOICE_2))
).thenReturn(Optional.of(List.of()));
settledInvoices.refresh();
@@ -52,8 +59,16 @@ class SettledInvoicesTest {
}
@Test
void refresh_ignores_invalid_invoices() {
when(grpcInvoices.getSettledInvoicesAfter(OFFSET)).thenReturn(
void refresh_subscribes_after_bulk_get() {
settledInvoices.refresh();
InOrder inOrder = inOrder(grpcInvoices);
inOrder.verify(grpcInvoices).getSettledInvoicesAfter(ADD_INDEX_OFFSET);
inOrder.verify(grpcInvoices).getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET);
}
@Test
void refresh_ignores_invalid_invoices_in_bulk_get() {
when(grpcInvoices.getSettledInvoicesAfter(ADD_INDEX_OFFSET)).thenReturn(
Optional.of(List.of(SETTLED_INVOICE, SettledInvoice.INVALID, SETTLED_INVOICE_2, SettledInvoice.INVALID))
).thenReturn(Optional.of(List.of()));
settledInvoices.refresh();
@@ -62,7 +77,7 @@ class SettledInvoicesTest {
@Test
void refresh_repeats_while_at_limit() {
when(grpcInvoices.getSettledInvoicesAfter(OFFSET))
when(grpcInvoices.getSettledInvoicesAfter(ADD_INDEX_OFFSET))
.thenReturn(Optional.of(List.of(SETTLED_INVOICE)))
.thenReturn(Optional.of(List.of(SETTLED_INVOICE_2)))
.thenReturn(Optional.of(List.of()));
@@ -70,4 +85,21 @@ class SettledInvoicesTest {
verify(dao).save(List.of(SETTLED_INVOICE));
verify(dao).save(List.of(SETTLED_INVOICE_2));
}
@Test
void refresh_saves_invoices_from_subscription() {
when(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET))
.thenReturn(Stream.of(SETTLED_INVOICE, SETTLED_INVOICE_2));
settledInvoices.refresh();
verify(dao).save(SETTLED_INVOICE);
verify(dao).save(SETTLED_INVOICE_2);
}
@Test
void refresh_skips_invalid_invoices_from_subscription() {
when(grpcInvoices.getNewSettledInvoicesAfter(SETTLE_INDEX_OFFSET))
.thenReturn(Stream.of(SETTLED_INVOICE, SettledInvoice.INVALID, SETTLED_INVOICE_2));
settledInvoices.refresh();
verify(dao, times(2)).save(any(SettledInvoice.class));
}
}

View File

@@ -28,16 +28,35 @@ class SettledInvoicesDaoImplTest {
private SettledInvoicesRepository repository;
@Test
void getOffset_initially_0() {
void getSettleIndexOffset_initially_0() {
when(repository.getMaxSettledIndex()).thenReturn(0L);
assertThat(dao.getSettleIndexOffset()).isEqualTo(0);
}
@Test
void getSettleIndexOffset() {
long expectedOffset = 123;
when(repository.getMaxSettledIndex()).thenReturn(expectedOffset);
assertThat(dao.getSettleIndexOffset()).isEqualTo(expectedOffset);
}
@Test
void getAddIndexOffset_initially_0() {
when(repository.getMaxAddIndexWithoutGaps()).thenReturn(0L);
assertThat(dao.getOffset()).isEqualTo(0);
assertThat(dao.getAddIndexOffset()).isEqualTo(0);
}
@Test
void getMaxAddIndexWithoutGaps() {
long expectedOffset = 123;
when(repository.getMaxAddIndexWithoutGaps()).thenReturn(expectedOffset);
assertThat(dao.getOffset()).isEqualTo(expectedOffset);
assertThat(dao.getAddIndexOffset()).isEqualTo(expectedOffset);
}
@Test
void save_single() {
dao.save(SETTLED_INVOICE);
verify(repository).save(argThat(jpaDto -> jpaDto.getMemo().equals(SETTLED_INVOICE.memo())));
}
@Test