1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

debugging gob encoding

This commit is contained in:
postmannen 2022-02-17 08:14:56 +01:00
parent 9e9a48ee35
commit cd348b5a0c

View file

@ -358,6 +358,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val))
switch val[0] {
case "z":
fmt.Printf(" * DEBUG: SUBSCRIBING: Compress got z for message of type: %v\n", p.subject.name())
zr, err := zstd.NewReader(nil)
if err != nil {
log.Printf("error: zstd NewReader failed: %v\n", err)
@ -375,6 +376,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
zr.Close()
case "g":
fmt.Printf(" * DEBUG: SUBSCRIBING: Compress got g for message of type: %v\n", p.subject.name())
r := bytes.NewReader(msgData)
gr, err := gzip.NewReader(r)
if err != nil {
@ -402,6 +404,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val))
switch val[0] {
case "cbor":
fmt.Printf(" * DEBUG: SUBSCRIBING: Found serial header with cbor for message of type: %v\n", p.subject.name())
err := cbor.Unmarshal(msgData, &message)
if err != nil {
er := fmt.Errorf("error: cbor decoding failed: %v", err)
@ -410,6 +413,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
return
}
default: // Deaults to gob if no match was found.
fmt.Printf(" * DEBUG: SUBSCRIBING: Found serial header, but cbor not defined for message of type: %v, defaulting to GOB\n", p.subject.name())
r := bytes.NewReader(msgData)
gobDec := gob.NewDecoder(r)
@ -422,6 +426,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
}
} else {
fmt.Printf(" * DEBUG: SUBSCRIBING: Found NO serial header for message of type: %v\n", p.subject.name())
// Default to gob if serialization flag was not specified.
r := bytes.NewReader(msgData)
gobDec := gob.NewDecoder(r)
@ -618,7 +623,7 @@ func (p process) addMethodArgSignature(m Message) []byte {
func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, natsConn *nats.Conn) {
// Create the initial header, and set values below depending on the
// various configuration options chosen.
natsMsgHeader := nats.Header{}
natsMsgHeader := make(nats.Header)
// The serialized value of the nats message payload
var natsMsgPayloadSerialized []byte
@ -637,6 +642,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
natsMsgPayloadSerialized = b
natsMsgHeader["serial"] = []string{p.configuration.Serialization}
fmt.Printf(" * DEBUG: PUBLISHING: set CBOR for message of type: %v\n", p.subject.name())
default:
var bufGob bytes.Buffer
@ -649,6 +655,8 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
}
natsMsgPayloadSerialized = bufGob.Bytes()
natsMsgHeader["serial"] = []string{"gob"}
fmt.Printf(" * DEBUG: PUBLISHING: no serialization flag specified, set GOB for message of type: %v\n", p.subject.name())
}
// Get the process name so we can look up the process in the
@ -671,6 +679,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
zEnc.Reset(nil)
fmt.Printf(" * DEBUG: PUBLISHING: compression got z, set %v for message of type: %v\n", "z", p.subject.name())
case "g": // gzip
var buf bytes.Buffer
@ -685,12 +694,16 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
natsMsgPayloadCompressed = buf.Bytes()
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
fmt.Printf(" * DEBUG: PUBLISHING: compression got g, set %v for message of type: %v\n", p.configuration.Compression, p.subject.name())
case "": // no compression
natsMsgPayloadCompressed = natsMsgPayloadSerialized
natsMsgHeader["cmp"] = []string{"none"}
fmt.Printf(" * DEBUG: PUBLISHING: Compress got \"\" for publishing: set %v for message of type: %v\n", "none", p.subject.name())
default: // no compression
// Allways log the error to console.
er := fmt.Errorf("error: compression type not defined, setting default to zero compression")
er := fmt.Errorf("error: PUBLISHING: compression type not defined, setting default to no compression")
log.Printf("%v\n", er)
// We only wan't to send the error message to errorCentral once.
@ -698,7 +711,11 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
p.processes.errorKernel.errSend(p, m, er)
})
// No compression, so we just assign the value of the serialized
// data directly to the variable used with messageDeliverNats.
natsMsgPayloadCompressed = natsMsgPayloadSerialized
natsMsgHeader["cmp"] = []string{"none"}
fmt.Printf(" * DEBUG: Compress got nothing defined for publishing: set %v for message of type: %v\n", "none", p.subject.name())
}
// Create the Nats message with headers and payload, and do the