Make binary decision; either use rabbit for everything or dont

This commit is contained in:
Lucas Rouckhout
2023-07-14 13:52:29 +02:00
parent d530ee6eb6
commit 8f6aee5750
4 changed files with 48 additions and 41 deletions

View File

@@ -16,8 +16,13 @@ import (
const (
defaultHeartbeat = 10 * time.Second
defaultLocale = "en_US"
msgReconnect = "RECONNECT_DONE"
msgClose = "CLOSE"
)
type listenerMsg = string
type AMQPClient interface {
Listen(ctx context.Context, exchange string, routingKey string, queueName string, options ...AMQPListenOptions) (<-chan amqp.Delivery, error)
PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
@@ -37,7 +42,7 @@ type defaultAMQPCLient struct {
notifyCloseChan chan *amqp.Error
listeners []chan interface{}
listeners []chan listenerMsg
reconFlag atomic.Bool
logger *lecho.Logger
@@ -90,7 +95,7 @@ func (c *defaultAMQPCLient) connect() error {
c.publishChannel = publishChannel
c.notifyCloseChan = notifyCloseChan
c.listeners = []chan interface{}{}
c.listeners = []chan listenerMsg{}
return nil
}
@@ -111,6 +116,7 @@ func (c *defaultAMQPCLient) reconnectionLoop() error {
expontentialBackoff.MaxElapsedTime = time.Minute
c.reconFlag.Store(true)
err := backoff.Retry(func() error {
c.logger.Info("amqp: trying to reconnect...")
@@ -145,14 +151,19 @@ func (c *defaultAMQPCLient) reconnectionLoop() error {
return nil
}, expontentialBackoff)
c.reconFlag.Store(false)
if err != nil {
for _, listener := range c.listeners {
listener <- msgClose
}
return err
}
for _, listener := range c.listeners {
listener <- "DONE"
listener <- msgReconnect
}
}
}
@@ -236,13 +247,13 @@ func (c *defaultAMQPCLient) Listen(ctx context.Context, exchange string, routing
clientChannel := make(chan amqp.Delivery)
notifyReconnectChan := make(chan interface{}, 2)
notifyReconnectChan := make(chan listenerMsg, 2)
c.listeners = append(c.listeners, notifyReconnectChan)
// This routine functions as a wrapper arround the "raw" delivery channel.
// The happy-path of the select statement, i.e. the last one, is to simply
// pass on the message we get from the actual amqp channel. If however, a
// message is passed on the notifyReconnectChan it means the recoonection
// message is passed on the notifyReconnectChan it means the reconnection
// loop was successful in reconnecting. Which means the listener should
// get a new deliveries channel from the new amqp channels that were made.
go func() {
@@ -251,18 +262,25 @@ func (c *defaultAMQPCLient) Listen(ctx context.Context, exchange string, routing
case <-ctx.Done():
return
case <-notifyReconnectChan:
d, err := c.consume(ctx, exchange, routingKey, queueName, options...)
if err != nil {
c.logger.Error(err)
case msg := <-notifyReconnectChan:
switch msg {
case msgReconnect:
d, err := c.consume(ctx, exchange, routingKey, queueName, options...)
if err != nil {
c.logger.Error(err)
return
return
}
c.logger.Info("succesfully consuming from new deliveries channel")
deliveries = d
case msgClose:
close(clientChannel)
default:
c.logger.Warnf("amqp: unrecognized message send to listener: %s", msg)
}
c.logger.Info("succesfully consuming from new deliveries channel")
deliveries = d
case delivery, ok := <-deliveries:
if ok {
clientChannel <- delivery