1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

restructured function logic for sending messages

This commit is contained in:
postmannen 2022-02-01 07:22:06 +01:00
parent 7b230f1fc6
commit 130f9fc57e

View file

@ -589,150 +589,151 @@ func (p process) publishMessages(natsConn *nats.Conn) {
// to jump back here to the beginning of the loop and continue
// with the next message.
for {
var err error
var m Message
// Wait and read the next message on the message channel, or
// exit this function if Cancel are received via ctx.
select {
case m = <-p.subject.messageCh:
case m := <-p.subject.messageCh:
p.publishAMessage(m, zEnc, once, natsConn)
case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.subject.name())
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
log.Printf("%v\n", er)
return
}
// Create the initial header, and set values below depending on the
// various configuration options chosen.
natsMsgHeader := nats.Header{}
// The serialized value of the nats message payload
var natsMsgPayloadSerialized []byte
// encode the message structure into gob binary format before putting
// it into a nats message.
// Prepare a gob encoder with a buffer before we start the loop
switch p.configuration.Serialization {
case "cbor":
b, err := cbor.Marshal(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
p.processes.errorKernel.errSend(p, m, er)
continue
}
natsMsgPayloadSerialized = b
natsMsgHeader["serial"] = []string{p.configuration.Serialization}
default:
var bufGob bytes.Buffer
gobEnc := gob.NewEncoder(&bufGob)
err = gobEnc.Encode(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
p.processes.errorKernel.errSend(p, m, er)
continue
}
natsMsgPayloadSerialized = bufGob.Bytes()
}
// Get the process name so we can look up the process in the
// processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher)
m.ID = p.messageID
// The compressed value of the nats message payload. The content
// can either be compressed or in it's original form depening on
// the outcome of the switch below, and if compression were chosen
// or not.
var natsMsgPayloadCompressed []byte
// Compress the data payload if selected with configuration flag.
// The compression chosen is later set in the nats msg header when
// calling p.messageDeliverNats below.
switch p.configuration.Compression {
case "z": // zstd
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
zEnc.Reset(nil)
case "g": // gzip
var buf bytes.Buffer
gzipW := gzip.NewWriter(&buf)
_, err := gzipW.Write(natsMsgPayloadSerialized)
if err != nil {
log.Printf("error: failed to write gzip: %v\n", err)
gzipW.Close()
continue
}
gzipW.Close()
natsMsgPayloadCompressed = buf.Bytes()
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
case "": // no compression
natsMsgPayloadCompressed = natsMsgPayloadSerialized
default: // no compression
// Allways log the error to console.
er := fmt.Errorf("error: compression type not defined: %v, setting default to zero compression", err)
log.Printf("%v\n", er)
// We only wan't to send the error message to errorCentral once.
once.Do(func() {
p.processes.errorKernel.errSend(p, m, er)
})
natsMsgPayloadCompressed = natsMsgPayloadSerialized
}
// Create the Nats message with headers and payload, and do the
// sending of the message.
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m)
// Signaling back to the ringbuffer that we are done with the
// current message, and it can remove it from the ringbuffer.
m.done <- struct{}{}
// Increment the counter for the next message to be sent.
p.messageID++
{
p.processes.active.mu.Lock()
p.processes.active.procNames[pn] = p
p.processes.active.mu.Unlock()
}
// Handle the error.
//
// NOTE: None of the processes above generate an error, so the the
// if clause will never be triggered. But keeping it here as an example
// for now for how to handle errors.
if err != nil {
// Create an error type which also creates a channel which the
// errorKernel will send back the action about what to do.
ep := errorEvent{
//errorType: logOnly,
process: p,
message: m,
errorActionCh: make(chan errorAction),
}
p.errorCh <- ep
// Wait for the response action back from the error kernel, and
// decide what to do. Should we continue, quit, or .... ?
switch <-ep.errorActionCh {
case errActionContinue:
// Just log and continue
log.Printf("The errAction was continue...so we're continuing\n")
case errActionKill:
log.Printf("The errAction was kill...so we're killing\n")
// ....
default:
log.Printf("Info: publishMessages: The errAction was not defined, so we're doing nothing\n")
}
}
}
}
func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, natsConn *nats.Conn) {
// Create the initial header, and set values below depending on the
// various configuration options chosen.
natsMsgHeader := nats.Header{}
// The serialized value of the nats message payload
var natsMsgPayloadSerialized []byte
// encode the message structure into gob binary format before putting
// it into a nats message.
// Prepare a gob encoder with a buffer before we start the loop
switch p.configuration.Serialization {
case "cbor":
b, err := cbor.Marshal(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
p.processes.errorKernel.errSend(p, m, er)
return
}
natsMsgPayloadSerialized = b
natsMsgHeader["serial"] = []string{p.configuration.Serialization}
default:
var bufGob bytes.Buffer
gobEnc := gob.NewEncoder(&bufGob)
err := gobEnc.Encode(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
p.processes.errorKernel.errSend(p, m, er)
return
}
natsMsgPayloadSerialized = bufGob.Bytes()
}
// Get the process name so we can look up the process in the
// processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher)
m.ID = p.messageID
// The compressed value of the nats message payload. The content
// can either be compressed or in it's original form depening on
// the outcome of the switch below, and if compression were chosen
// or not.
var natsMsgPayloadCompressed []byte
// Compress the data payload if selected with configuration flag.
// The compression chosen is later set in the nats msg header when
// calling p.messageDeliverNats below.
switch p.configuration.Compression {
case "z": // zstd
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
zEnc.Reset(nil)
case "g": // gzip
var buf bytes.Buffer
gzipW := gzip.NewWriter(&buf)
_, err := gzipW.Write(natsMsgPayloadSerialized)
if err != nil {
log.Printf("error: failed to write gzip: %v\n", err)
gzipW.Close()
return
}
gzipW.Close()
natsMsgPayloadCompressed = buf.Bytes()
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
case "": // no compression
natsMsgPayloadCompressed = natsMsgPayloadSerialized
default: // no compression
// Allways log the error to console.
er := fmt.Errorf("error: compression type not defined, setting default to zero compression")
log.Printf("%v\n", er)
// We only wan't to send the error message to errorCentral once.
once.Do(func() {
p.processes.errorKernel.errSend(p, m, er)
})
natsMsgPayloadCompressed = natsMsgPayloadSerialized
}
// Create the Nats message with headers and payload, and do the
// sending of the message.
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m)
// Signaling back to the ringbuffer that we are done with the
// current message, and it can remove it from the ringbuffer.
m.done <- struct{}{}
// Increment the counter for the next message to be sent.
p.messageID++
{
p.processes.active.mu.Lock()
p.processes.active.procNames[pn] = p
p.processes.active.mu.Unlock()
}
// // Handle the error.
// //
// // NOTE: None of the processes above generate an error, so the the
// // if clause will never be triggered. But keeping it here as an example
// // for now for how to handle errors.
// if err != nil {
// // Create an error type which also creates a channel which the
// // errorKernel will send back the action about what to do.
// ep := errorEvent{
// //errorType: logOnly,
// process: p,
// message: m,
// errorActionCh: make(chan errorAction),
// }
// p.errorCh <- ep
//
// // Wait for the response action back from the error kernel, and
// // decide what to do. Should we continue, quit, or .... ?
// switch <-ep.errorActionCh {
// case errActionContinue:
// // Just log and continue
// log.Printf("The errAction was continue...so we're continuing\n")
// case errActionKill:
// log.Printf("The errAction was kill...so we're killing\n")
// // ....
// default:
// log.Printf("Info: publishMessages: The errAction was not defined, so we're doing nothing\n")
// }
// }
}