From 3d7b73560a06db6c824ac972c29ec88ab905fb1a Mon Sep 17 00:00:00 2001 From: Bernhard B Date: Mon, 11 Dec 2023 22:18:23 +0100 Subject: [PATCH] fixed bug in golang channel handling (json-rpc mode) * golang channels are meant to be 1:1 channels, so if multiple goroutines listen on the same channel for messages, only one will receive the message and the others are not, which lead to lost messages. In order to fix that, we create a dedicated golang channel for every websocket connection. see #451 --- src/api/api.go | 3 ++- src/client/client.go | 14 +++++++++++--- src/client/jsonrpc2.go | 36 +++++++++++++++++++++++++----------- 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/api/api.go b/src/api/api.go index 4cb083c..34a7c3e 100644 --- a/src/api/api.go +++ b/src/api/api.go @@ -390,7 +390,7 @@ func (a *Api) SendV2(c *gin.Context) { } func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) { - receiveChannel, err := a.signalClient.GetReceiveChannel() + receiveChannel, channelUuid, err := a.signalClient.GetReceiveChannel() if err != nil { log.Error("Couldn't get receive channel: ", err.Error()) return @@ -399,6 +399,7 @@ func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan s for { select { case <-stop: + a.signalClient.RemoveReceiveChannel(channelUuid) ws.Close() return case msg := <-receiveChannel: diff --git a/src/client/client.go b/src/client/client.go index 839bc16..2039e19 100644 --- a/src/client/client.go +++ b/src/client/client.go @@ -730,12 +730,20 @@ func (s *SignalClient) Receive(number string, timeout int64, ignoreAttachments b } } -func (s *SignalClient) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, error) { +func (s *SignalClient) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) { jsonRpc2Client, err := s.getJsonRpc2Client() if err != nil { - return nil, err + return nil, "", err } - return jsonRpc2Client.GetReceiveChannel(), nil + return jsonRpc2Client.GetReceiveChannel() +} + +func (s *SignalClient) RemoveReceiveChannel(channelUuid string) { + jsonRpc2Client, err := s.getJsonRpc2Client() + if err != nil { + return + } + jsonRpc2Client.RemoveReceiveChannel(channelUuid) } func (s *SignalClient) CreateGroup(number string, name string, members []string, description string, editGroupPermission GroupPermission, addMembersPermission GroupPermission, groupLinkState GroupLinkState) (string, error) { diff --git a/src/client/jsonrpc2.go b/src/client/jsonrpc2.go index e6191a0..d5afda8 100644 --- a/src/client/jsonrpc2.go +++ b/src/client/jsonrpc2.go @@ -33,7 +33,7 @@ type JsonRpc2ReceivedMessage struct { type JsonRpc2Client struct { conn net.Conn receivedResponsesById map[string]chan JsonRpc2MessageResponse - receivedMessages chan JsonRpc2ReceivedMessage + receivedMessagesChannels map[string]chan JsonRpc2ReceivedMessage lastTimeErrorMessageSent time.Time signalCliApiConfig *utils.SignalCliApiConfig number string @@ -44,6 +44,7 @@ func NewJsonRpc2Client(signalCliApiConfig *utils.SignalCliApiConfig, number stri signalCliApiConfig: signalCliApiConfig, number: number, receivedResponsesById: make(map[string]chan JsonRpc2MessageResponse), + receivedMessagesChannels: make(map[string]chan JsonRpc2ReceivedMessage), } } @@ -54,8 +55,6 @@ func (r *JsonRpc2Client) Dial(address string) error { return err } - r.receivedMessages = make(chan JsonRpc2ReceivedMessage) - return nil } @@ -143,13 +142,15 @@ func (r *JsonRpc2Client) ReceiveData(number string) { var resp1 JsonRpc2ReceivedMessage json.Unmarshal([]byte(str), &resp1) if resp1.Method == "receive" { - select { - case r.receivedMessages <- resp1: - log.Debug("Message sent to golang channel") - default: - log.Debug("Couldn't send message to golang channel, as there's no receiver") + for _, c := range r.receivedMessagesChannels { + select { + case c <- resp1: + log.Debug("Message sent to golang channel") + default: + log.Debug("Couldn't send message to golang channel, as there's no receiver") + } + continue } - continue } var resp2 JsonRpc2MessageResponse @@ -166,6 +167,19 @@ func (r *JsonRpc2Client) ReceiveData(number string) { } } -func (r *JsonRpc2Client) GetReceiveChannel() chan JsonRpc2ReceivedMessage { - return r.receivedMessages +func (r *JsonRpc2Client) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) { + c := make(chan JsonRpc2ReceivedMessage) + + channelUuid, err := uuid.NewV4() + if err != nil { + return c, "", err + } + + r.receivedMessagesChannels[channelUuid.String()] = c + + return c, channelUuid.String(), nil +} + +func (r *JsonRpc2Client) RemoveReceiveChannel(channelUuid string) { + delete(r.receivedMessagesChannels, channelUuid) }