fixed bug in golang channel handling (json-rpc mode)

* golang channels are meant to be 1:1 channels, so if multiple
  goroutines listen on the same channel for messages, only one will
  receive the message and the others are not, which lead to lost
  messages.

  In order to fix that, we create a dedicated golang channel for every
  websocket connection.

see #451
This commit is contained in:
Bernhard B
2023-12-11 22:18:23 +01:00
parent 844f1d7b91
commit 3d7b73560a
3 changed files with 38 additions and 15 deletions

View File

@@ -390,7 +390,7 @@ func (a *Api) SendV2(c *gin.Context) {
} }
func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) { func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) {
receiveChannel, err := a.signalClient.GetReceiveChannel() receiveChannel, channelUuid, err := a.signalClient.GetReceiveChannel()
if err != nil { if err != nil {
log.Error("Couldn't get receive channel: ", err.Error()) log.Error("Couldn't get receive channel: ", err.Error())
return return
@@ -399,6 +399,7 @@ func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan s
for { for {
select { select {
case <-stop: case <-stop:
a.signalClient.RemoveReceiveChannel(channelUuid)
ws.Close() ws.Close()
return return
case msg := <-receiveChannel: case msg := <-receiveChannel:

View File

@@ -730,12 +730,20 @@ func (s *SignalClient) Receive(number string, timeout int64, ignoreAttachments b
} }
} }
func (s *SignalClient) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, error) { func (s *SignalClient) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) {
jsonRpc2Client, err := s.getJsonRpc2Client() jsonRpc2Client, err := s.getJsonRpc2Client()
if err != nil { if err != nil {
return nil, err return nil, "", err
} }
return jsonRpc2Client.GetReceiveChannel(), nil return jsonRpc2Client.GetReceiveChannel()
}
func (s *SignalClient) RemoveReceiveChannel(channelUuid string) {
jsonRpc2Client, err := s.getJsonRpc2Client()
if err != nil {
return
}
jsonRpc2Client.RemoveReceiveChannel(channelUuid)
} }
func (s *SignalClient) CreateGroup(number string, name string, members []string, description string, editGroupPermission GroupPermission, addMembersPermission GroupPermission, groupLinkState GroupLinkState) (string, error) { func (s *SignalClient) CreateGroup(number string, name string, members []string, description string, editGroupPermission GroupPermission, addMembersPermission GroupPermission, groupLinkState GroupLinkState) (string, error) {

View File

@@ -33,7 +33,7 @@ type JsonRpc2ReceivedMessage struct {
type JsonRpc2Client struct { type JsonRpc2Client struct {
conn net.Conn conn net.Conn
receivedResponsesById map[string]chan JsonRpc2MessageResponse receivedResponsesById map[string]chan JsonRpc2MessageResponse
receivedMessages chan JsonRpc2ReceivedMessage receivedMessagesChannels map[string]chan JsonRpc2ReceivedMessage
lastTimeErrorMessageSent time.Time lastTimeErrorMessageSent time.Time
signalCliApiConfig *utils.SignalCliApiConfig signalCliApiConfig *utils.SignalCliApiConfig
number string number string
@@ -44,6 +44,7 @@ func NewJsonRpc2Client(signalCliApiConfig *utils.SignalCliApiConfig, number stri
signalCliApiConfig: signalCliApiConfig, signalCliApiConfig: signalCliApiConfig,
number: number, number: number,
receivedResponsesById: make(map[string]chan JsonRpc2MessageResponse), receivedResponsesById: make(map[string]chan JsonRpc2MessageResponse),
receivedMessagesChannels: make(map[string]chan JsonRpc2ReceivedMessage),
} }
} }
@@ -54,8 +55,6 @@ func (r *JsonRpc2Client) Dial(address string) error {
return err return err
} }
r.receivedMessages = make(chan JsonRpc2ReceivedMessage)
return nil return nil
} }
@@ -143,14 +142,16 @@ func (r *JsonRpc2Client) ReceiveData(number string) {
var resp1 JsonRpc2ReceivedMessage var resp1 JsonRpc2ReceivedMessage
json.Unmarshal([]byte(str), &resp1) json.Unmarshal([]byte(str), &resp1)
if resp1.Method == "receive" { if resp1.Method == "receive" {
for _, c := range r.receivedMessagesChannels {
select { select {
case r.receivedMessages <- resp1: case c <- resp1:
log.Debug("Message sent to golang channel") log.Debug("Message sent to golang channel")
default: default:
log.Debug("Couldn't send message to golang channel, as there's no receiver") log.Debug("Couldn't send message to golang channel, as there's no receiver")
} }
continue continue
} }
}
var resp2 JsonRpc2MessageResponse var resp2 JsonRpc2MessageResponse
err = json.Unmarshal([]byte(str), &resp2) err = json.Unmarshal([]byte(str), &resp2)
@@ -166,6 +167,19 @@ func (r *JsonRpc2Client) ReceiveData(number string) {
} }
} }
func (r *JsonRpc2Client) GetReceiveChannel() chan JsonRpc2ReceivedMessage { func (r *JsonRpc2Client) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) {
return r.receivedMessages c := make(chan JsonRpc2ReceivedMessage)
channelUuid, err := uuid.NewV4()
if err != nil {
return c, "", err
}
r.receivedMessagesChannels[channelUuid.String()] = c
return c, channelUuid.String(), nil
}
func (r *JsonRpc2Client) RemoveReceiveChannel(channelUuid string) {
delete(r.receivedMessagesChannels, channelUuid)
} }