mirror of
https://github.com/aljazceru/lspd.git
synced 2026-02-23 15:04:23 +01:00
lsps0: handle requests in a goroutine
This commit is contained in:
@@ -20,6 +20,7 @@ var ErrServerStopped = errors.New("lsps0: the server has been stopped")
|
||||
var Lsps0MessageType uint32 = 37913
|
||||
var BadMessageFormatError string = "bad message format"
|
||||
var InternalError string = "internal error"
|
||||
var MaxSimultaneousRequests = 25
|
||||
|
||||
// ServiceDesc and is constructed from it for internal purposes.
|
||||
type serviceInfo struct {
|
||||
@@ -61,6 +62,7 @@ func (s *Server) Serve(lis lightning.CustomMsgClient) error {
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
|
||||
guard := make(chan struct{}, MaxSimultaneousRequests)
|
||||
for {
|
||||
msg, err := lis.Recv()
|
||||
if err != nil {
|
||||
@@ -113,6 +115,9 @@ func (s *Server) Serve(lis lightning.CustomMsgClient) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// Deserialization step of the request params. This function is called
|
||||
// by method handlers of service implementations to deserialize the
|
||||
// typed request object.
|
||||
df := func(v interface{}) error {
|
||||
if err := json.Unmarshal(req.Params, v); err != nil {
|
||||
return status.Newf(codes.InvalidParams, "invalid params").Err()
|
||||
@@ -121,9 +126,19 @@ func (s *Server) Serve(lis lightning.CustomMsgClient) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NOTE: The handler is being called synchonously. There's an option to
|
||||
// do this in a goroutine instead. Also, there's the option to put the
|
||||
// goroutine in the method desc for specific methods instead.
|
||||
// Will block if the guard queue is already filled to ensure
|
||||
// MaxSimultaneousRequests is not exceeded.
|
||||
guard <- struct{}{}
|
||||
|
||||
// NOTE: The handler is being called asynchonously. This may cause the
|
||||
// order of messages handled to be different from the order in which
|
||||
// they were received.
|
||||
go func() {
|
||||
// Releases a queued item in the guard, to release a spot for
|
||||
// another simultaneous request.
|
||||
defer func() { <-guard }()
|
||||
|
||||
// Call the method handler for the requested method.
|
||||
r, err := m.method.Handler(m.service.serviceImpl, context.TODO(), df)
|
||||
if err != nil {
|
||||
s, ok := status.FromError(err)
|
||||
@@ -132,11 +147,12 @@ func (s *Server) Serve(lis lightning.CustomMsgClient) error {
|
||||
s = status.New(codes.InternalError, InternalError)
|
||||
}
|
||||
|
||||
go sendError(lis, msg, req, s)
|
||||
continue
|
||||
sendError(lis, msg, req, s)
|
||||
return
|
||||
}
|
||||
|
||||
go sendResponse(lis, msg, req, r)
|
||||
sendResponse(lis, msg, req, r)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user