1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 13:49:29 +00:00

removed log statements from process.go

This commit is contained in:
postmannen 2022-02-18 06:17:06 +01:00
parent 209fb572be
commit c94213fe36

View file

@ -359,13 +359,13 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
case "z": case "z":
zr, err := zstd.NewReader(nil) zr, err := zstd.NewReader(nil)
if err != nil { if err != nil {
log.Printf("error: zstd NewReader failed: %v\n", err) er := fmt.Errorf("error: zstd NewReader failed: %v", err)
p.processes.errorKernel.errSend(p, Message{}, er)
return return
} }
msgData, err = zr.DecodeAll(msg.Data, nil) msgData, err = zr.DecodeAll(msg.Data, nil)
if err != nil { if err != nil {
er := fmt.Errorf("error: zstd decoding failed: %v", err) er := fmt.Errorf("error: zstd decoding failed: %v", err)
log.Printf("%v\n", er)
p.processes.errorKernel.errSend(p, Message{}, er) p.processes.errorKernel.errSend(p, Message{}, er)
zr.Close() zr.Close()
return return
@ -377,13 +377,15 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
r := bytes.NewReader(msgData) r := bytes.NewReader(msgData)
gr, err := gzip.NewReader(r) gr, err := gzip.NewReader(r)
if err != nil { if err != nil {
log.Printf("error: gzip NewReader failed: %v\n", err) er := fmt.Errorf("error: gzip NewReader failed: %v", err)
p.processes.errorKernel.errSend(p, Message{}, er)
return return
} }
b, err := io.ReadAll(gr) b, err := io.ReadAll(gr)
if err != nil { if err != nil {
log.Printf("error: gzip ReadAll failed: %v\n", err) er := fmt.Errorf("error: gzip ReadAll failed: %v", err)
p.processes.errorKernel.errSend(p, Message{}, er)
return return
} }
@ -396,15 +398,14 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
message := Message{} message := Message{}
// Check if serialization is specified. // Check if serialization is specified.
// Will default to gob serialization if nothing or non existing value is specified is specified. // Will default to gob serialization if nothing or non existing value is specified.
if val, ok := msg.Header["serial"]; ok { if val, ok := msg.Header["serial"]; ok {
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val)) // fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val))
switch val[0] { switch val[0] {
case "cbor": case "cbor":
err := cbor.Unmarshal(msgData, &message) err := cbor.Unmarshal(msgData, &message)
if err != nil { if err != nil {
er := fmt.Errorf("error: cbor decoding failed: %v", err) er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
log.Printf("%v\n", er)
p.processes.errorKernel.errSend(p, message, er) p.processes.errorKernel.errSend(p, message, er)
return return
} }
@ -414,7 +415,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
err := gobDec.Decode(&message) err := gobDec.Decode(&message)
if err != nil { if err != nil {
er := fmt.Errorf("error: gob decoding failed serial header, subject: %v, header: %v, error: %v", subject, msg.Header, err) er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.processes.errorKernel.errSend(p, message, er) p.processes.errorKernel.errSend(p, message, er)
return return
} }
@ -427,8 +428,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
err := gobDec.Decode(&message) err := gobDec.Decode(&message)
if err != nil { if err != nil {
er := fmt.Errorf("error: gob decoding failed no-serial header, subject: %v, header: %v, error: %v", subject, msg.Header, err) er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
log.Printf("%v\n", er)
p.processes.errorKernel.errSend(p, message, er) p.processes.errorKernel.errSend(p, message, er)
return return
} }
@ -453,8 +453,9 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
case msgCopy.PreviousMessage.RelayReplyMethod == "": case msgCopy.PreviousMessage.RelayReplyMethod == "":
er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previous message: %v ", msgCopy) er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previous message: %v ", msgCopy)
p.processes.errorKernel.errSend(p, message, er) p.processes.errorKernel.errSend(p, message, er)
log.Printf("%v\n", er)
msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod
case msgCopy.PreviousMessage.RelayReplyMethod != "": case msgCopy.PreviousMessage.RelayReplyMethod != "":
msgCopy.Method = msgCopy.PreviousMessage.RelayReplyMethod msgCopy.Method = msgCopy.PreviousMessage.RelayReplyMethod
} }
@ -705,7 +706,6 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
// data directly to the variable used with messageDeliverNats. // data directly to the variable used with messageDeliverNats.
natsMsgPayloadCompressed = natsMsgPayloadSerialized natsMsgPayloadCompressed = natsMsgPayloadSerialized
natsMsgHeader["cmp"] = []string{"none"} natsMsgHeader["cmp"] = []string{"none"}
fmt.Printf(" * DEBUG: Compress got nothing defined for publishing: set %v for message of type: %v\n", "none", p.subject.name())
} }
// Create the Nats message with headers and payload, and do the // Create the Nats message with headers and payload, and do the