1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 14:56:49 +00:00

added cbor subscriber decoding

This commit is contained in:
postmannen 2021-12-29 08:11:43 +01:00
parent 574bed573a
commit 3bf5fca5cd

View file

@ -338,16 +338,41 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
message := Message{}
// Create a buffer to decode the gob encoded binary data back
// to it's original structure.
buf := bytes.NewBuffer(msgData)
gobDec := gob.NewDecoder(buf)
err := gobDec.Decode(&message)
if err != nil {
er := fmt.Errorf("error: gob decoding failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
return
// Is serializatio
if val, ok := msg.Header["serial"]; ok {
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val))
switch val[0] {
case "cbor":
err := cbor.Unmarshal(msgData, &message)
if err != nil {
er := fmt.Errorf("error: cbor decoding failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
return
}
default: // Deaults to gob if no match was found.
buf := bytes.NewBuffer(msgData)
gobDec := gob.NewDecoder(buf)
err := gobDec.Decode(&message)
if err != nil {
er := fmt.Errorf("error: gob decoding failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
return
}
}
} else {
// Default to gob if serialization flag was not specified.
buf := bytes.NewBuffer(msgData)
gobDec := gob.NewDecoder(buf)
err := gobDec.Decode(&message)
if err != nil {
er := fmt.Errorf("error: gob decoding failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
return
}
}
// Send final reply for a relayed message back to the originating node.
@ -402,6 +427,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
}
var out []byte
var err error
out, err = mh.handler(p, message, thisNode)