Files
kata-containers/virtcontainers/pkg/hyperstart/multicast.go
Graham whaley d6c3ec864b license: SPDX: update all vc files to use SPDX style
When imported, the vc files carried in the 'full style' apache
license text, but the standard for kata is to use SPDX style.
Update the relevant files to SPDX.

Fixes: #227

Signed-off-by: Graham whaley <graham.whaley@intel.com>
2018-04-18 13:43:15 +01:00

165 lines
3.4 KiB
Go

// Copyright (c) 2017 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package hyperstart
import (
"encoding/json"
"fmt"
"net"
"sync"
)
type ctlDataType string
const (
eventType ctlDataType = "ctlEvent"
replyType ctlDataType = "ctlReply"
)
type multicast struct {
bufReplies []*DecodedMessage
reply []chan *DecodedMessage
event map[string]chan *DecodedMessage
ctl net.Conn
sync.Mutex
}
func newMulticast(ctlConn net.Conn) *multicast {
return &multicast{
bufReplies: []*DecodedMessage{},
reply: []chan *DecodedMessage{},
event: make(map[string]chan *DecodedMessage),
ctl: ctlConn,
}
}
func startCtlMonitor(ctlConn net.Conn, done chan<- interface{}) *multicast {
ctlMulticast := newMulticast(ctlConn)
go func() {
for {
msg, err := ReadCtlMessage(ctlMulticast.ctl)
if err != nil {
hyperLog.Infof("Read on CTL channel ended: %s", err)
break
}
err = ctlMulticast.write(msg)
if err != nil {
hyperLog.Errorf("Multicaster write error: %s", err)
break
}
}
close(done)
}()
return ctlMulticast
}
func (m *multicast) buildEventID(containerID, processID string) string {
return fmt.Sprintf("%s-%s", containerID, processID)
}
func (m *multicast) sendEvent(msg *DecodedMessage) error {
var paeData PAECommand
err := json.Unmarshal(msg.Message, &paeData)
if err != nil {
return err
}
uniqueID := m.buildEventID(paeData.Container, paeData.Process)
channel, exist := m.event[uniqueID]
if !exist {
return nil
}
channel <- msg
delete(m.event, uniqueID)
return nil
}
func (m *multicast) sendReply(msg *DecodedMessage) error {
m.Lock()
if len(m.reply) == 0 {
m.bufReplies = append(m.bufReplies, msg)
m.Unlock()
return nil
}
replyChannel := m.reply[0]
m.reply = m.reply[1:]
m.Unlock()
// The current reply channel has been removed from the list, that's why
// we can be out of the mutex to send through that channel. Indeed, there
// is no risk that someone else tries to write on this channel.
replyChannel <- msg
return nil
}
func (m *multicast) processBufferedReply(channel chan *DecodedMessage) {
m.Lock()
if len(m.bufReplies) == 0 {
m.reply = append(m.reply, channel)
m.Unlock()
return
}
msg := m.bufReplies[0]
m.bufReplies = m.bufReplies[1:]
m.Unlock()
// The current buffered reply message has been removed from the list, and
// the channel have not been added to the reply list, that's why we can be
// out of the mutex to send the buffered message through that channel.
// There is no risk that someone else tries to write this message on another
// channel, or another message on this channel.
channel <- msg
}
func (m *multicast) write(msg *DecodedMessage) error {
switch msg.Code {
case NextCode:
return nil
case ProcessAsyncEventCode:
return m.sendEvent(msg)
default:
return m.sendReply(msg)
}
}
func (m *multicast) listen(containerID, processID string, dataType ctlDataType) (chan *DecodedMessage, error) {
switch dataType {
case replyType:
newChan := make(chan *DecodedMessage)
go m.processBufferedReply(newChan)
return newChan, nil
case eventType:
uniqueID := m.buildEventID(containerID, processID)
_, exist := m.event[uniqueID]
if exist {
return nil, fmt.Errorf("Channel already assigned for ID %s", uniqueID)
}
m.event[uniqueID] = make(chan *DecodedMessage)
return m.event[uniqueID], nil
default:
return nil, fmt.Errorf("Unknown data type: %s", dataType)
}
}