Initial approach to reconnect behavior.

This commit is contained in:
Lucas Rouckhout
2023-05-30 16:14:05 +02:00
parent 2d40875685
commit df5a1fcfce
2 changed files with 139 additions and 36 deletions

View File

@@ -2,42 +2,106 @@ package rabbitmq
import ( import (
"context" "context"
"os"
"sync"
"time"
"github.com/labstack/gommon/log"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/ziflex/lecho/v3"
)
const (
defaultHeartbeat = 10 * time.Second
defaultLocale = "en_US"
) )
type AMQPClient interface { type AMQPClient interface {
Listen(ctx context.Context, exchange string, routingKey string, queueName string, options ...AMQPListenOptions) (<-chan amqp.Delivery, error) Listen(ctx context.Context, exchange string, routingKey string, queueName string, options ...AMQPListenOptions) (<-chan amqp.Delivery, error)
PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
Close() error Close() error
} }
type defaultAMQPCLient struct { type defaultAMQPCLient struct {
conn *amqp.Connection conn *amqp.Connection
uri string
// It is recommended that, when possible, publishers and consumers // It is recommended that, when possible, publishers and consumers
// use separate connections so that consumers are isolated from potential // use separate connections so that consumers are isolated from potential
// flow control measures that may be applied to publishing connections. // flow control measures that may be applied to publishing connections.
consumeChannel *amqp.Channel consumeChannel *amqp.Channel
publishChannel *amqp.Channel publishChannel *amqp.Channel
notifyCloseChan chan *amqp.Error
logger *lecho.Logger
} }
func (c *defaultAMQPCLient) Close() error { return c.conn.Close() } type DialOption = func(amqp.Config) amqp.Config
func DialAMQP(uri string) (AMQPClient, error) {
client := &defaultAMQPCLient{
uri: uri,
logger: lecho.New(
os.Stdout,
lecho.WithLevel(log.DEBUG),
lecho.WithTimestamp(),
),
}
err := client.connect()
return client, err
}
func (c *defaultAMQPCLient) connect() error {
conn, err := amqp.DialConfig(c.uri, amqp.Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
Dial: amqp.DefaultDial(time.Second * 3),
})
if err != nil {
return err
}
consumeChannel, err := conn.Channel()
if err != nil {
return err
}
publishChannel, err := conn.Channel()
if err != nil {
return err
}
notifyCloseChan := make(chan *amqp.Error)
conn.NotifyClose(notifyCloseChan)
c.conn = conn
c.consumeChannel = consumeChannel
c.publishChannel = publishChannel
c.notifyCloseChan = notifyCloseChan
return nil
}
func (c *defaultAMQPCLient) Close() error {
close(c.notifyCloseChan)
return c.conn.Close()
}
func (c *defaultAMQPCLient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error { func (c *defaultAMQPCLient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
// TODO: Seperate management channel? Or provide way to select channel? // For now we simply create a short lived channel. If this proves to be a bad approach we can either create a management channel
ch, err := c.conn.Channel() // at client create time, or use either the consumer/publishing channels that already exist.
if err != nil { ch, err := c.conn.Channel()
return err if err != nil {
} return err
defer ch.Close() }
defer ch.Close()
return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
} }
type ListenOptions struct { type ListenOptions struct {
Durable bool Durable bool
AutoDelete bool AutoDelete bool
@@ -92,6 +156,66 @@ func WithAutoAck(autoAck bool) AMQPListenOptions {
} }
func (c *defaultAMQPCLient) Listen(ctx context.Context, exchange string, routingKey string, queueName string, options ...AMQPListenOptions) (<-chan amqp.Delivery, error) { func (c *defaultAMQPCLient) Listen(ctx context.Context, exchange string, routingKey string, queueName string, options ...AMQPListenOptions) (<-chan amqp.Delivery, error) {
deliveries, err := c.consume(ctx, exchange, routingKey, queueName, options...)
if err != nil {
return nil, err
}
clientChannel := make(chan amqp.Delivery)
// This routine functions as a wrapper arround the "raw" delivery channel.
// The happy-path of the select statement, i.e. the last one, is to simply
// pass on the message we get from the actual amqp channel. If however, an
// error is send over the NotifyClose channel it means we must try to
// reconnect if the error is Recoverable. In the meantime the client using
// the Listen function is non the wiser that this happened. A successful
// reconnect will make sure we recieve message from a new "raw" delivery
// channel on the next loop we simply keep sending new messages to the
// client channel using this new underlying connection/channel.
go func() {
for {
select {
case <-ctx.Done():
c.Close()
return
case amqpError := <-c.notifyCloseChan:
c.logger.Error(amqpError.Error())
if !amqpError.Recover {
c.Close()
return
}
c.logger.Info("amqp: trying to reconnect...")
err := c.connect()
if err != nil {
c.logger.Error(err)
c.Close()
return
}
d, err := c.consume(ctx, exchange, routingKey, queueName, options...)
if err != nil {
c.logger.Error(err)
c.Close()
return
}
deliveries = d
case delivery := <-deliveries:
clientChannel <- delivery
}
}
}()
return clientChannel, nil
}
func (c *defaultAMQPCLient) consume(ctx context.Context, exchange string, routingKey string, queueName string, options ...AMQPListenOptions) (<-chan amqp.Delivery, error) {
opts := ListenOptions{ opts := ListenOptions{
Durable: true, Durable: true,
AutoDelete: false, AutoDelete: false,
@@ -168,28 +292,6 @@ func (c *defaultAMQPCLient) Listen(ctx context.Context, exchange string, routing
} }
func (c *defaultAMQPCLient) PublishWithContext(ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error { func (c *defaultAMQPCLient) PublishWithContext(ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error {
// TODO: Think about race condition here. When a connection retry is in progress the publishing channel will get reassigned as well.
return c.publishChannel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) return c.publishChannel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
} }
func DialAMQP(uri string) (AMQPClient, error) {
conn, err := amqp.Dial(uri)
if err != nil {
return nil, err
}
consumeChannel, err := conn.Channel()
if err != nil {
return nil, err
}
publishChannel, err := conn.Channel()
if err != nil {
return nil, err
}
return &defaultAMQPCLient{
conn,
consumeChannel,
publishChannel,
}, nil
}

View File

@@ -256,11 +256,12 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
return err return err
} }
client.logger.Info("Starting RabbitMQ consumer loop") client.logger.Info("Starting RabbitMQ invoice consumer loop")
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return context.Canceled return context.Canceled
case delivery, ok := <-deliveryChan: case delivery, ok := <-deliveryChan:
if !ok { if !ok {
return fmt.Errorf("Disconnected from RabbitMQ") return fmt.Errorf("Disconnected from RabbitMQ")