fix merge conflict

This commit is contained in:
kiwiidb
2023-07-14 14:13:31 +02:00
parent 358007318a
commit 3da5d0c9a1

View File

@@ -61,6 +61,7 @@ func DialAMQP(uri string) (AMQPClient, error) {
reconFlag: atomic.Bool{}, reconFlag: atomic.Bool{},
} }
err := client.connect() err := client.connect()
client.listeners = []chan listenerMsg{}
go client.reconnectionLoop() go client.reconnectionLoop()
@@ -95,8 +96,6 @@ func (c *defaultAMQPCLient) connect() error {
c.publishChannel = publishChannel c.publishChannel = publishChannel
c.notifyCloseChan = notifyCloseChan c.notifyCloseChan = notifyCloseChan
c.listeners = []chan listenerMsg{}
return nil return nil
} }
@@ -120,33 +119,11 @@ func (c *defaultAMQPCLient) reconnectionLoop() error {
err := backoff.Retry(func() error { err := backoff.Retry(func() error {
c.logger.Info("amqp: trying to reconnect...") c.logger.Info("amqp: trying to reconnect...")
conn, err := amqp.DialConfig(c.uri, amqp.Config{ err := c.connect()
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
Dial: amqp.DefaultDial(time.Second * 3),
})
if err != nil { if err != nil {
return err return err
} }
consumeChannel, err := conn.Channel()
if err != nil {
return err
}
publishChannel, err := conn.Channel()
if err != nil {
return err
}
notifyCloseChan := make(chan *amqp.Error)
conn.NotifyClose(notifyCloseChan)
c.conn = conn
c.consumeChannel = consumeChannel
c.publishChannel = publishChannel
c.notifyCloseChan = notifyCloseChan
c.logger.Info("amqp: succesfully reconnected") c.logger.Info("amqp: succesfully reconnected")
return nil return nil
@@ -272,7 +249,7 @@ func (c *defaultAMQPCLient) Listen(ctx context.Context, exchange string, routing
return return
} }
c.logger.Infof("amqp: succesfully consuming messages with routingkey: %s from new deliveries channel", routingKey) c.logger.Infof("amqp: succesfully consuming messages with routingkey: %s from new deliveries channel", routingKey)
deliveries = d deliveries = d
case msgClose: case msgClose: