Add GetTransactionsStream RPC (#345)

* RPC GetPaymentsStream

This introduces a new feature to the ArkService API that allows clients to subscribe to payment events. Here's a breakdown of the changes:

1. **OpenAPI Specification (`service.swagger.json`):**
   - A new endpoint `/v1/payments` is added to the API, supporting a `GET` operation for streaming payment events.
   - New definitions `v1GetPaymentsStreamResponse`, `v1RoundPayment`, and `v1AsyncPayment` are added to describe the structure of the streaming responses.

2. **Protobuf Definition (`service.proto`):**
   - Added a new RPC method `GetPaymentsStream` that streams `GetPaymentsStreamResponse` messages.
   - Defined new message types: `GetPaymentsStreamRequest`, `GetPaymentsStreamResponse`, `RoundPayment`, and `AsyncPayment`.

3. **Generated Protobuf Code (`service.pb.go`, `service.pb.gw.go`, `service_grpc.pb.go`):**
   - The generated code is updated to include the new RPC method and message types.
   - The gateway code includes functions to handle HTTP requests and responses for the new streaming endpoint.

4. **Application Logic (`covenant.go`, `covenantless.go`):**
   - New payment events channels are introduced (`paymentEventsCh`).
   - Payment events are propagated to these channels when a round is finalized or an async payment is completed.
   - New event types `RoundPaymentEvent` and `AsyncPaymentEvent` are defined, implementing a `PaymentEvent` interface.

5. **gRPC Handlers (`arkservice.go`):**
   - Added logic to handle `GetPaymentsStream` requests and manage payment listeners.
   - A new goroutine is started to listen to payment events and forward them to active listeners.

Overall, this patch extends the ArkService to support real-time streaming of payment events, allowing clients to receive updates on both round payments and async payments as they occur.

* Move emit events in updateVtxoSet & Use generics and parsers (#1)

* Move sending event to updateVtxoSet

* Use generics and parsers

* pr review refactor

* pr review refactor

* fix

---------

Co-authored-by: Pietralberto Mazza <18440657+altafan@users.noreply.github.com>
This commit is contained in:
Dusan Sekulic
2024-10-04 12:23:16 +02:00
committed by GitHub
parent 26bcbc8163
commit d37af7daf5
10 changed files with 996 additions and 206 deletions

View File

@@ -15,27 +15,22 @@ import (
"google.golang.org/grpc/status"
)
type listener struct {
id string
done chan struct{}
ch chan *arkv1.GetEventStreamResponse
}
type handler struct {
svc application.Service
listenersLock *sync.Mutex
listeners []*listener
eventsListenerHandler *listenerHanlder[*arkv1.GetEventStreamResponse]
transactionsListenerHandler *listenerHanlder[*arkv1.GetTransactionsStreamResponse]
}
func NewHandler(service application.Service) arkv1.ArkServiceServer {
h := &handler{
svc: service,
listenersLock: &sync.Mutex{},
listeners: make([]*listener, 0),
svc: service,
eventsListenerHandler: newListenerHandler[*arkv1.GetEventStreamResponse](),
transactionsListenerHandler: newListenerHandler[*arkv1.GetTransactionsStreamResponse](),
}
go h.listenToEvents()
go h.listenToPaymentEvents()
return h
}
@@ -234,14 +229,14 @@ func (h *handler) GetEventStream(
) error {
doneCh := make(chan struct{})
listener := &listener{
listener := &listener[*arkv1.GetEventStreamResponse]{
id: uuid.NewString(),
done: doneCh,
ch: make(chan *arkv1.GetEventStreamResponse),
}
h.pushListener(listener)
defer h.removeListener(listener.id)
h.eventsListenerHandler.pushListener(listener)
defer h.eventsListenerHandler.removeListener(listener.id)
defer close(listener.ch)
defer close(doneCh)
@@ -488,21 +483,31 @@ func (h *handler) ListVtxos(
}, nil
}
func (h *handler) pushListener(l *listener) {
h.listenersLock.Lock()
defer h.listenersLock.Unlock()
func (h *handler) GetTransactionsStream(
_ *arkv1.GetTransactionsStreamRequest,
stream arkv1.ArkService_GetTransactionsStreamServer,
) error {
listener := &listener[*arkv1.GetTransactionsStreamResponse]{
id: uuid.NewString(),
done: make(chan struct{}),
ch: make(chan *arkv1.GetTransactionsStreamResponse),
}
h.listeners = append(h.listeners, l)
}
h.transactionsListenerHandler.pushListener(listener)
func (h *handler) removeListener(id string) {
h.listenersLock.Lock()
defer h.listenersLock.Unlock()
defer func() {
h.transactionsListenerHandler.removeListener(listener.id)
close(listener.ch)
}()
for i, listener := range h.listeners {
if listener.id == id {
h.listeners = append(h.listeners[:i], h.listeners[i+1:]...)
return
for {
select {
case <-stream.Context().Done():
return nil
case ev := <-listener.ch:
if err := stream.Send(ev); err != nil {
return err
}
}
}
}
@@ -582,9 +587,9 @@ func (h *handler) listenToEvents() {
}
if ev != nil {
logrus.Debugf("forwarding event to %d listeners", len(h.listeners))
for _, l := range h.listeners {
go func(l *listener) {
logrus.Debugf("forwarding event to %d listeners", len(h.eventsListenerHandler.listeners))
for _, l := range h.eventsListenerHandler.listeners {
go func(l *listener[*arkv1.GetEventStreamResponse]) {
l.ch <- ev
if shouldClose {
l.done <- struct{}{}
@@ -594,3 +599,88 @@ func (h *handler) listenToEvents() {
}
}
}
func (h *handler) listenToPaymentEvents() {
paymentEventsCh := h.svc.GetTransactionEventsChannel(context.Background())
for event := range paymentEventsCh {
var paymentEvent *arkv1.GetTransactionsStreamResponse
switch event.Type() {
case application.RoundTransaction:
paymentEvent = &arkv1.GetTransactionsStreamResponse{
Tx: &arkv1.GetTransactionsStreamResponse_Round{
Round: convertRoundPaymentEvent(event.(application.RoundTransactionEvent)),
},
}
case application.RedeemTransaction:
paymentEvent = &arkv1.GetTransactionsStreamResponse{
Tx: &arkv1.GetTransactionsStreamResponse_Redeem{
Redeem: convertAsyncPaymentEvent(event.(application.RedeemTransactionEvent)),
},
}
}
if paymentEvent != nil {
logrus.Debugf("forwarding event to %d listeners", len(h.transactionsListenerHandler.listeners))
for _, l := range h.transactionsListenerHandler.listeners {
go func(l *listener[*arkv1.GetTransactionsStreamResponse]) {
l.ch <- paymentEvent
}(l)
}
}
}
}
func convertRoundPaymentEvent(e application.RoundTransactionEvent) *arkv1.RoundTransaction {
return &arkv1.RoundTransaction{
Txid: e.RoundTxID,
SpentVtxos: vtxoKeyList(e.SpentVtxos).toProto(),
SpendableVtxos: vtxoList(e.SpendableVtxos).toProto(),
ClaimedBoardingUtxos: vtxoKeyList(e.ClaimedBoardingInputs).toProto(),
}
}
func convertAsyncPaymentEvent(e application.RedeemTransactionEvent) *arkv1.RedeemTransaction {
return &arkv1.RedeemTransaction{
Txid: e.AsyncTxID,
SpentVtxos: vtxoKeyList(e.SpentVtxos).toProto(),
SpendableVtxos: vtxoList(e.SpendableVtxos).toProto(),
}
}
type listener[T any] struct {
id string
done chan struct{}
ch chan T
}
type listenerHanlder[T any] struct {
lock *sync.Mutex
listeners []*listener[T]
}
func newListenerHandler[T any]() *listenerHanlder[T] {
return &listenerHanlder[T]{
lock: &sync.Mutex{},
listeners: make([]*listener[T], 0),
}
}
func (h *listenerHanlder[T]) pushListener(l *listener[T]) {
h.lock.Lock()
defer h.lock.Unlock()
h.listeners = append(h.listeners, l)
}
func (h *listenerHanlder[T]) removeListener(id string) {
h.lock.Lock()
defer h.lock.Unlock()
for i, listener := range h.listeners {
if listener.id == id {
h.listeners = append(h.listeners[:i], h.listeners[i+1:]...)
return
}
}
}

View File

@@ -92,6 +92,19 @@ func (v vtxoList) toProto() []*arkv1.Vtxo {
return list
}
type vtxoKeyList []domain.VtxoKey
func (v vtxoKeyList) toProto() []*arkv1.Outpoint {
list := make([]*arkv1.Outpoint, 0, len(v))
for _, vtxoKey := range v {
list = append(list, &arkv1.Outpoint{
Txid: vtxoKey.Txid,
Vout: vtxoKey.VOut,
})
}
return list
}
type congestionTree tree.CongestionTree
func (t congestionTree) toProto() *arkv1.Tree {