implemented receive endpoint via websocket connection

This commit is contained in:
Bernhard B
2021-09-13 22:09:33 +02:00
parent d8a5ddfc98
commit 760883bdca
5 changed files with 133 additions and 38 deletions

View File

@@ -9,6 +9,7 @@ import (
"github.com/gabriel-vasile/mimetype" "github.com/gabriel-vasile/mimetype"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/gorilla/websocket"
"github.com/bbernhard/signal-cli-rest-api/client" "github.com/bbernhard/signal-cli-rest-api/client"
utils "github.com/bbernhard/signal-cli-rest-api/utils" utils "github.com/bbernhard/signal-cli-rest-api/utils"
@@ -82,6 +83,12 @@ type SendMessageResponse struct {
Timestamp string `json:"timestamp"` Timestamp string `json:"timestamp"`
} }
var connectionUpgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Api struct { type Api struct {
signalClient *client.SignalClient signalClient *client.SignalClient
} }
@@ -273,20 +280,51 @@ func (a *Api) SendV2(c *gin.Context) {
func (a *Api) Receive(c *gin.Context) { func (a *Api) Receive(c *gin.Context) {
number := c.Param("number") number := c.Param("number")
timeout := c.DefaultQuery("timeout", "1") if a.signalClient.GetSignalCliMode() == client.JsonRpc {
timeoutInt, err := strconv.ParseInt(timeout, 10, 32) ws, err := connectionUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil { if err != nil {
c.JSON(400, Error{Msg: "Couldn't process request - timeout needs to be numeric!"}) c.JSON(400, Error{Msg: err.Error()})
return return
} }
defer ws.Close()
for {
data, err := a.signalClient.Receive(number, 0)
if err == nil {
err = ws.WriteMessage(websocket.TextMessage, []byte(data))
if err != nil {
log.Error("Couldn't write message: " + err.Error())
return
}
} else {
errorMsg := Error{Msg: err.Error()}
errorMsgBytes, err := json.Marshal(errorMsg)
if err != nil {
log.Error("Couldn't serialize error message: " + err.Error())
return
}
err = ws.WriteMessage(websocket.TextMessage, errorMsgBytes)
if err != nil {
log.Error("Couldn't write message: " + err.Error())
return
}
}
}
} else {
timeout := c.DefaultQuery("timeout", "1")
timeoutInt, err := strconv.ParseInt(timeout, 10, 32)
if err != nil {
c.JSON(400, Error{Msg: "Couldn't process request - timeout needs to be numeric!"})
return
}
jsonStr, err := a.signalClient.Receive(number, timeoutInt) jsonStr, err := a.signalClient.Receive(number, timeoutInt)
if err != nil { if err != nil {
c.JSON(400, Error{Msg: err.Error()}) c.JSON(400, Error{Msg: err.Error()})
return return
} }
c.String(200, jsonStr) c.String(200, jsonStr)
}
} }
// @Summary Create a new Signal Group. // @Summary Create a new Signal Group.

View File

@@ -297,6 +297,10 @@ func NewSignalClient(signalCliConfig string, attachmentTmpDir string, avatarTmpD
} }
} }
func (s *SignalClient) GetSignalCliMode() SignalCliMode {
return s.signalCliMode
}
func (s *SignalClient) Init() error { func (s *SignalClient) Init() error {
if s.signalCliMode == JsonRpc { if s.signalCliMode == JsonRpc {
s.jsonRpc2ClientConfig = utils.NewJsonRpc2ClientConfig() s.jsonRpc2ClientConfig = utils.NewJsonRpc2ClientConfig()
@@ -312,6 +316,8 @@ func (s *SignalClient) Init() error {
if err != nil { if err != nil {
return err return err
} }
go s.jsonRpc2Clients[number].ReceiveData() //receive messages in goroutine
} }
} }
return nil return nil
@@ -542,8 +548,16 @@ func (s *SignalClient) SendV2(number string, message string, recps []string, bas
} }
func (s *SignalClient) Receive(number string, timeout int64) (string, error) { func (s *SignalClient) Receive(number string, timeout int64) (string, error) {
if s.signalCliMode == Native { if s.signalCliMode == JsonRpc {
return "", errors.New(endpointNotSupportedInJsonRpcMode) jsonRpc2Client, err := s.getJsonRpc2Client(number)
if err != nil {
return "", err
}
msg := jsonRpc2Client.ReceiveMessage()
if msg.Err.Code != 0 {
return "", errors.New(msg.Err.Message)
}
return string(msg.Params), nil
} else { } else {
command := []string{"--config", s.signalCliConfig, "--output", "json", "-u", number, "receive", "-t", strconv.FormatInt(timeout, 10)} command := []string{"--config", s.signalCliConfig, "--output", "json", "-u", number, "receive", "-t", strconv.FormatInt(timeout, 10)}

View File

@@ -9,8 +9,27 @@ import (
"net" "net"
) )
type Error struct {
Code int `json:"code"`
Message string `json:"message"`
}
type JsonRpc2MessageResponse struct {
Id string `json:"id"`
Result json.RawMessage `json:"result"`
Err Error `json:"error"`
}
type JsonRpc2ReceivedMessage struct {
Method string `json:"method"`
Params json.RawMessage `json:"params"`
Err Error `json:"error"`
}
type JsonRpc2Client struct { type JsonRpc2Client struct {
conn net.Conn conn net.Conn
receivedMessageResponses chan JsonRpc2MessageResponse
receivedMessages chan JsonRpc2ReceivedMessage
} }
func NewJsonRpc2Client() *JsonRpc2Client { func NewJsonRpc2Client() *JsonRpc2Client {
@@ -24,6 +43,9 @@ func (r *JsonRpc2Client) Dial(address string) error {
return err return err
} }
r.receivedMessageResponses = make(chan JsonRpc2MessageResponse)
r.receivedMessages = make(chan JsonRpc2ReceivedMessage)
return nil return nil
} }
@@ -35,17 +57,6 @@ func (r *JsonRpc2Client) getRaw(command string, args interface{}) (string, error
Params interface{} `json:"params,omitempty"` Params interface{} `json:"params,omitempty"`
} }
type Error struct {
Code int `json:"code"`
Message string `json:"message"`
}
type Response struct {
Id string `json:"id"`
Result json.RawMessage `json:"result"`
Err Error `json:"error"`
}
u, err := uuid.NewV4() u, err := uuid.NewV4()
if err != nil { if err != nil {
return "", err return "", err
@@ -68,27 +79,56 @@ func (r *JsonRpc2Client) getRaw(command string, args interface{}) (string, error
return "", err return "", err
} }
var resp JsonRpc2MessageResponse
for {
resp = <-r.receivedMessageResponses
if resp.Id == u.String() {
break
}
}
if resp.Err.Code != 0 {
return "", errors.New(resp.Err.Message)
}
return string(resp.Result), nil
}
func (r *JsonRpc2Client) ReceiveData() {
connbuf := bufio.NewReader(r.conn) connbuf := bufio.NewReader(r.conn)
for { for {
str, err := connbuf.ReadString('\n') str, err := connbuf.ReadString('\n')
if err != nil { if err != nil {
return "", err log.Error("Couldn't read data: ", err.Error())
continue
}
//log.Info("Received data = ", str)
var resp1 JsonRpc2ReceivedMessage
json.Unmarshal([]byte(str), &resp1)
if resp1.Method == "receive" {
select {
case r.receivedMessages <- resp1:
log.Debug("Message sent to golang channel")
default:
log.Debug("Couldn't send message to golang channel, as there's no receiver")
}
continue
} }
var resp Response var resp2 JsonRpc2MessageResponse
err = json.Unmarshal([]byte(str), &resp) err = json.Unmarshal([]byte(str), &resp2)
if err == nil { if err == nil {
if resp.Id == u.String() { if resp2.Id != "" {
log.Info("Response1 = ", string(resp.Result)) r.receivedMessageResponses <- resp2
if resp.Err.Code != 0 {
return "", errors.New(resp.Err.Message)
}
return string(resp.Result), nil
} }
} else { } else {
log.Info("Response = ", str) log.Error("Received unparsable message: ", str)
} }
} }
}
return "", errors.New("no data")
//blocks until message a message is received
func (r *JsonRpc2Client) ReceiveMessage() JsonRpc2ReceivedMessage {
resp := <-r.receivedMessages
return resp
} }

View File

@@ -11,6 +11,7 @@ require (
github.com/go-openapi/spec v0.19.8 // indirect github.com/go-openapi/spec v0.19.8 // indirect
github.com/go-openapi/swag v0.19.9 // indirect github.com/go-openapi/swag v0.19.9 // indirect
github.com/gofrs/uuid v3.3.0+incompatible github.com/gofrs/uuid v3.3.0+incompatible
github.com/gorilla/websocket v1.4.2
github.com/h2non/filetype v1.1.0 github.com/h2non/filetype v1.1.0
github.com/mailru/easyjson v0.7.1 // indirect github.com/mailru/easyjson v0.7.1 // indirect
github.com/robfig/cron/v3 v3.0.1 github.com/robfig/cron/v3 v3.0.1

View File

@@ -69,6 +69,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/h2non/filetype v1.1.0 h1:Or/gjocJrJRNK/Cri/TDEKFjAR+cfG6eK65NGYB6gBA= github.com/h2non/filetype v1.1.0 h1:Or/gjocJrJRNK/Cri/TDEKFjAR+cfG6eK65NGYB6gBA=
github.com/h2non/filetype v1.1.0/go.mod h1:319b3zT68BvV+WRj7cwy856M2ehB3HqNOt6sy1HndBY= github.com/h2non/filetype v1.1.0/go.mod h1:319b3zT68BvV+WRj7cwy856M2ehB3HqNOt6sy1HndBY=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=