1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

moved setting of cmp nats header value

This commit is contained in:
postmannen 2021-12-29 06:40:42 +01:00
parent 37084768f7
commit ce053fc651

View file

@ -195,7 +195,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
// It will also take care of the delivering the message that is converted to
// gob format as a nats.Message. It will also take care of checking timeouts
// and retries specified for the message.
func (p process) messageDeliverNats(natsMsgPayload []byte, natsConn *nats.Conn, message Message) {
func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) {
retryAttempts := 0
const publishTimer time.Duration = 5
@ -209,18 +209,11 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsConn *nats.Conn,
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommandRequest"),
// Structure of the reply message are:
// <nodename>.<message type>.<method>.reply
Reply: fmt.Sprintf("%s.reply", p.subject.name()),
Data: natsMsgPayload,
Reply: fmt.Sprintf("%s.reply", p.subject.name()),
Data: natsMsgPayload,
Header: natsMsgHeader,
}
if p.configuration.Compression != "" {
msg.Header = nats.Header{
"cmp": []string{p.configuration.Compression},
}
// fmt.Printf(" * DEBUG: compression enabled, trying to set header: %v\n", msg.Header)
}
// fmt.Printf(" * DEBUG: header value after if block: %v\n", msg.Header)
// The SubscribeSync used in the subscriber, will get messages that
// are sent after it started subscribing.
//
@ -481,6 +474,10 @@ func (p process) publishMessages(natsConn *nats.Conn) {
return
}
// Create the initial header, and set values below depending on the
// various configuration options chosen.
natsMsgHeader := nats.Header{}
// encode the message structure into gob binary format before putting
// it into a nats message.
// Prepare a gob encoder with a buffer before we start the loop
@ -512,6 +509,8 @@ func (p process) publishMessages(natsConn *nats.Conn) {
natsMsgPayload = enc.EncodeAll(bufGob.Bytes(), nil)
fmt.Printf("* len of zstd mJson : %v\n", len(natsMsgPayload))
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
case "": // no compression
natsMsgPayload = bufGob.Bytes()
@ -530,7 +529,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
// Create the Nats message with headers and payload, and do the
// sending of the message.
p.messageDeliverNats(natsMsgPayload, natsConn, m)
p.messageDeliverNats(natsMsgPayload, natsMsgHeader, natsConn, m)
fmt.Printf(" * DEBUG 2: %v\n", len(bufGob.Bytes()))