diff --git a/rabbitmq/amqp.go b/rabbitmq/amqp.go index 6fb7db8..66989d4 100644 --- a/rabbitmq/amqp.go +++ b/rabbitmq/amqp.go @@ -61,6 +61,7 @@ func DialAMQP(uri string) (AMQPClient, error) { reconFlag: atomic.Bool{}, } err := client.connect() + client.listeners = []chan listenerMsg{} go client.reconnectionLoop() @@ -95,8 +96,6 @@ func (c *defaultAMQPCLient) connect() error { c.publishChannel = publishChannel c.notifyCloseChan = notifyCloseChan - c.listeners = []chan listenerMsg{} - return nil } @@ -120,33 +119,11 @@ func (c *defaultAMQPCLient) reconnectionLoop() error { err := backoff.Retry(func() error { c.logger.Info("amqp: trying to reconnect...") - conn, err := amqp.DialConfig(c.uri, amqp.Config{ - Heartbeat: defaultHeartbeat, - Locale: defaultLocale, - Dial: amqp.DefaultDial(time.Second * 3), - }) + err := c.connect() if err != nil { return err } - consumeChannel, err := conn.Channel() - if err != nil { - return err - } - - publishChannel, err := conn.Channel() - if err != nil { - return err - } - - notifyCloseChan := make(chan *amqp.Error) - conn.NotifyClose(notifyCloseChan) - - c.conn = conn - c.consumeChannel = consumeChannel - c.publishChannel = publishChannel - c.notifyCloseChan = notifyCloseChan - c.logger.Info("amqp: succesfully reconnected") return nil @@ -272,7 +249,7 @@ func (c *defaultAMQPCLient) Listen(ctx context.Context, exchange string, routing return } - c.logger.Infof("amqp: succesfully consuming messages with routingkey: %s from new deliveries channel", routingKey) + c.logger.Infof("amqp: succesfully consuming messages with routingkey: %s from new deliveries channel", routingKey) deliveries = d case msgClose: