From 3bf5fca5cd02ac952f539c05c221fcb258ac05a1 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 29 Dec 2021 08:11:43 +0100 Subject: [PATCH] added cbor subscriber decoding --- process.go | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/process.go b/process.go index 83b11be..2f60557 100644 --- a/process.go +++ b/process.go @@ -338,16 +338,41 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, message := Message{} - // Create a buffer to decode the gob encoded binary data back - // to it's original structure. - buf := bytes.NewBuffer(msgData) - gobDec := gob.NewDecoder(buf) - err := gobDec.Decode(&message) - if err != nil { - er := fmt.Errorf("error: gob decoding failed: %v", err) - log.Printf("%v\n", er) - sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er) - return + // Is serializatio + if val, ok := msg.Header["serial"]; ok { + // fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val)) + switch val[0] { + case "cbor": + err := cbor.Unmarshal(msgData, &message) + if err != nil { + er := fmt.Errorf("error: cbor decoding failed: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er) + return + } + default: // Deaults to gob if no match was found. + buf := bytes.NewBuffer(msgData) + gobDec := gob.NewDecoder(buf) + err := gobDec.Decode(&message) + if err != nil { + er := fmt.Errorf("error: gob decoding failed: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er) + return + } + } + + } else { + // Default to gob if serialization flag was not specified. + buf := bytes.NewBuffer(msgData) + gobDec := gob.NewDecoder(buf) + err := gobDec.Decode(&message) + if err != nil { + er := fmt.Errorf("error: gob decoding failed: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er) + return + } } // Send final reply for a relayed message back to the originating node. @@ -402,6 +427,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, } var out []byte + var err error out, err = mh.handler(p, message, thisNode)