fixed bug in json-rpc mode

* properly close websocket connection when client disconnects

see #198
This commit is contained in:
Bernhard B
2022-01-16 22:00:12 +01:00
parent b64ee6bbb8
commit 284e92107c
3 changed files with 65 additions and 36 deletions

View File

@@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
"errors"
"github.com/gabriel-vasile/mimetype" "github.com/gabriel-vasile/mimetype"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -332,32 +333,58 @@ func (a *Api) SendV2(c *gin.Context) {
c.JSON(201, SendMessageResponse{Timestamp: strconv.FormatInt((*timestamps)[0].Timestamp, 10)}) c.JSON(201, SendMessageResponse{Timestamp: strconv.FormatInt((*timestamps)[0].Timestamp, 10)})
} }
func (a *Api) handleSignalReceive(ws *websocket.Conn, number string) { func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) {
receiveChannel, err := a.signalClient.GetReceiveChannel(number)
if err != nil {
log.Error("Couldn't get receive channel: ", err.Error())
return
}
for { for {
data, err := a.signalClient.Receive(number, 0) select {
if err == nil { case <-stop:
err = ws.WriteMessage(websocket.TextMessage, []byte(data)) ws.Close()
if err != nil { return
log.Error("Couldn't write message: " + err.Error()) case msg := <-receiveChannel:
return var data string = string(msg.Params)
var err error = nil
if msg.Err.Code != 0 {
err = errors.New(msg.Err.Message)
} }
} else {
errorMsg := Error{Msg: err.Error()} if err == nil {
errorMsgBytes, err := json.Marshal(errorMsg) if data != "" {
if err != nil { err = ws.WriteMessage(websocket.TextMessage, []byte(data))
log.Error("Couldn't serialize error message: " + err.Error()) if err != nil {
return if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
} log.Error("Couldn't write message: " + err.Error())
err = ws.WriteMessage(websocket.TextMessage, errorMsgBytes) }
if err != nil { return
log.Error("Couldn't write message: " + err.Error()) }
return }
} else {
errorMsg := Error{Msg: err.Error()}
errorMsgBytes, err := json.Marshal(errorMsg)
if err != nil {
log.Error("Couldn't serialize error message: " + err.Error())
return
}
err = ws.WriteMessage(websocket.TextMessage, errorMsgBytes)
if err != nil {
log.Error("Couldn't write message: " + err.Error())
return
}
} }
} }
} }
} }
func wsPong(ws *websocket.Conn) { func wsPong(ws *websocket.Conn, stop chan struct{}) {
defer func() {
close(stop)
ws.Close()
}()
ws.SetReadLimit(512) ws.SetReadLimit(512)
ws.SetPongHandler(func(string) error { log.Debug("Received pong"); return nil }) ws.SetPongHandler(func(string) error { log.Debug("Received pong"); return nil })
for { for {
@@ -368,10 +395,13 @@ func wsPong(ws *websocket.Conn) {
} }
} }
func wsPing(ws *websocket.Conn) { func wsPing(ws *websocket.Conn, stop chan struct{}) {
pingTicker := time.NewTicker(pingPeriod) pingTicker := time.NewTicker(pingPeriod)
for { for {
select { select {
case <-stop:
ws.Close()
return
case <-pingTicker.C: case <-pingTicker.C:
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return return
@@ -400,9 +430,10 @@ func (a *Api) Receive(c *gin.Context) {
return return
} }
defer ws.Close() defer ws.Close()
go a.handleSignalReceive(ws, number) var stop = make(chan struct{})
go wsPing(ws) go a.handleSignalReceive(ws, number, stop)
wsPong(ws) go wsPing(ws, stop)
wsPong(ws, stop)
} else { } else {
timeout := c.DefaultQuery("timeout", "1") timeout := c.DefaultQuery("timeout", "1")
timeoutInt, err := strconv.ParseInt(timeout, 10, 32) timeoutInt, err := strconv.ParseInt(timeout, 10, 32)

View File

@@ -591,15 +591,7 @@ func (s *SignalClient) SendV2(number string, message string, recps []string, bas
func (s *SignalClient) Receive(number string, timeout int64) (string, error) { func (s *SignalClient) Receive(number string, timeout int64) (string, error) {
if s.signalCliMode == JsonRpc { if s.signalCliMode == JsonRpc {
jsonRpc2Client, err := s.getJsonRpc2Client(number) return "", errors.New("Not implemented")
if err != nil {
return "", err
}
msg := jsonRpc2Client.ReceiveMessage()
if msg.Err.Code != 0 {
return "", errors.New(msg.Err.Message)
}
return string(msg.Params), nil
} else { } else {
command := []string{"--config", s.signalCliConfig, "--output", "json", "-a", number, "receive", "-t", strconv.FormatInt(timeout, 10)} command := []string{"--config", s.signalCliConfig, "--output", "json", "-a", number, "receive", "-t", strconv.FormatInt(timeout, 10)}
@@ -624,6 +616,14 @@ func (s *SignalClient) Receive(number string, timeout int64) (string, error) {
} }
} }
func (s *SignalClient) GetReceiveChannel(number string) (chan JsonRpc2ReceivedMessage, error) {
jsonRpc2Client, err := s.getJsonRpc2Client(number)
if err != nil {
return nil, err
}
return jsonRpc2Client.GetReceiveChannel(), nil
}
func (s *SignalClient) CreateGroup(number string, name string, members []string, description string, editGroupPermission GroupPermission, addMembersPermission GroupPermission, groupLinkState GroupLinkState) (string, error) { func (s *SignalClient) CreateGroup(number string, name string, members []string, description string, editGroupPermission GroupPermission, addMembersPermission GroupPermission, groupLinkState GroupLinkState) (string, error) {
var internalGroupId string var internalGroupId string
if s.signalCliMode == JsonRpc { if s.signalCliMode == JsonRpc {

View File

@@ -133,8 +133,6 @@ func (r *JsonRpc2Client) ReceiveData(number string) {
} }
} }
//blocks until message a message is received func (r *JsonRpc2Client) GetReceiveChannel() chan JsonRpc2ReceivedMessage {
func (r *JsonRpc2Client) ReceiveMessage() JsonRpc2ReceivedMessage { return r.receivedMessages
resp := <-r.receivedMessages
return resp
} }