diff --git a/src/api/api.go b/src/api/api.go index ac154bc..e9d2e25 100644 --- a/src/api/api.go +++ b/src/api/api.go @@ -6,6 +6,7 @@ import ( "net/http" "strconv" "time" + "errors" "github.com/gabriel-vasile/mimetype" "github.com/gin-gonic/gin" @@ -332,32 +333,58 @@ func (a *Api) SendV2(c *gin.Context) { c.JSON(201, SendMessageResponse{Timestamp: strconv.FormatInt((*timestamps)[0].Timestamp, 10)}) } -func (a *Api) handleSignalReceive(ws *websocket.Conn, number string) { +func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) { + receiveChannel, err := a.signalClient.GetReceiveChannel(number) + if err != nil { + log.Error("Couldn't get receive channel: ", err.Error()) + return + } + for { - data, err := a.signalClient.Receive(number, 0) - if err == nil { - err = ws.WriteMessage(websocket.TextMessage, []byte(data)) - if err != nil { - log.Error("Couldn't write message: " + err.Error()) - return + select { + case <-stop: + ws.Close() + return + case msg := <-receiveChannel: + var data string = string(msg.Params) + var err error = nil + if msg.Err.Code != 0 { + err = errors.New(msg.Err.Message) } - } else { - errorMsg := Error{Msg: err.Error()} - errorMsgBytes, err := json.Marshal(errorMsg) - if err != nil { - log.Error("Couldn't serialize error message: " + err.Error()) - return - } - err = ws.WriteMessage(websocket.TextMessage, errorMsgBytes) - if err != nil { - log.Error("Couldn't write message: " + err.Error()) - return + + if err == nil { + if data != "" { + err = ws.WriteMessage(websocket.TextMessage, []byte(data)) + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Error("Couldn't write message: " + err.Error()) + } + return + } + } + } else { + errorMsg := Error{Msg: err.Error()} + errorMsgBytes, err := json.Marshal(errorMsg) + if err != nil { + log.Error("Couldn't serialize error message: " + err.Error()) + return + } + err = ws.WriteMessage(websocket.TextMessage, errorMsgBytes) + if err != nil { + log.Error("Couldn't write message: " + err.Error()) + return + } } } } } -func wsPong(ws *websocket.Conn) { +func wsPong(ws *websocket.Conn, stop chan struct{}) { + defer func() { + close(stop) + ws.Close() + }() + ws.SetReadLimit(512) ws.SetPongHandler(func(string) error { log.Debug("Received pong"); return nil }) for { @@ -368,10 +395,13 @@ func wsPong(ws *websocket.Conn) { } } -func wsPing(ws *websocket.Conn) { +func wsPing(ws *websocket.Conn, stop chan struct{}) { pingTicker := time.NewTicker(pingPeriod) for { select { + case <-stop: + ws.Close() + return case <-pingTicker.C: if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { return @@ -400,9 +430,10 @@ func (a *Api) Receive(c *gin.Context) { return } defer ws.Close() - go a.handleSignalReceive(ws, number) - go wsPing(ws) - wsPong(ws) + var stop = make(chan struct{}) + go a.handleSignalReceive(ws, number, stop) + go wsPing(ws, stop) + wsPong(ws, stop) } else { timeout := c.DefaultQuery("timeout", "1") timeoutInt, err := strconv.ParseInt(timeout, 10, 32) diff --git a/src/client/client.go b/src/client/client.go index 45a8f0f..7bdaa97 100644 --- a/src/client/client.go +++ b/src/client/client.go @@ -591,15 +591,7 @@ func (s *SignalClient) SendV2(number string, message string, recps []string, bas func (s *SignalClient) Receive(number string, timeout int64) (string, error) { if s.signalCliMode == JsonRpc { - jsonRpc2Client, err := s.getJsonRpc2Client(number) - if err != nil { - return "", err - } - msg := jsonRpc2Client.ReceiveMessage() - if msg.Err.Code != 0 { - return "", errors.New(msg.Err.Message) - } - return string(msg.Params), nil + return "", errors.New("Not implemented") } else { command := []string{"--config", s.signalCliConfig, "--output", "json", "-a", number, "receive", "-t", strconv.FormatInt(timeout, 10)} @@ -624,6 +616,14 @@ func (s *SignalClient) Receive(number string, timeout int64) (string, error) { } } +func (s *SignalClient) GetReceiveChannel(number string) (chan JsonRpc2ReceivedMessage, error) { + jsonRpc2Client, err := s.getJsonRpc2Client(number) + if err != nil { + return nil, err + } + return jsonRpc2Client.GetReceiveChannel(), nil +} + func (s *SignalClient) CreateGroup(number string, name string, members []string, description string, editGroupPermission GroupPermission, addMembersPermission GroupPermission, groupLinkState GroupLinkState) (string, error) { var internalGroupId string if s.signalCliMode == JsonRpc { diff --git a/src/client/jsonrpc2.go b/src/client/jsonrpc2.go index 5e8331b..aa4a073 100644 --- a/src/client/jsonrpc2.go +++ b/src/client/jsonrpc2.go @@ -133,8 +133,6 @@ func (r *JsonRpc2Client) ReceiveData(number string) { } } -//blocks until message a message is received -func (r *JsonRpc2Client) ReceiveMessage() JsonRpc2ReceivedMessage { - resp := <-r.receivedMessages - return resp +func (r *JsonRpc2Client) GetReceiveChannel() chan JsonRpc2ReceivedMessage { + return r.receivedMessages }