diff --git a/process.go b/process.go index 7a801b8..e66d74f 100644 --- a/process.go +++ b/process.go @@ -589,150 +589,151 @@ func (p process) publishMessages(natsConn *nats.Conn) { // to jump back here to the beginning of the loop and continue // with the next message. for { - var err error - var m Message // Wait and read the next message on the message channel, or // exit this function if Cancel are received via ctx. select { - case m = <-p.subject.messageCh: + case m := <-p.subject.messageCh: + p.publishAMessage(m, zEnc, once, natsConn) case <-p.ctx.Done(): er := fmt.Errorf("info: canceling publisher: %v", p.subject.name()) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) log.Printf("%v\n", er) return } - - // Create the initial header, and set values below depending on the - // various configuration options chosen. - natsMsgHeader := nats.Header{} - - // The serialized value of the nats message payload - var natsMsgPayloadSerialized []byte - - // encode the message structure into gob binary format before putting - // it into a nats message. - // Prepare a gob encoder with a buffer before we start the loop - switch p.configuration.Serialization { - case "cbor": - b, err := cbor.Marshal(m) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) - p.processes.errorKernel.errSend(p, m, er) - continue - } - - natsMsgPayloadSerialized = b - natsMsgHeader["serial"] = []string{p.configuration.Serialization} - - default: - var bufGob bytes.Buffer - gobEnc := gob.NewEncoder(&bufGob) - err = gobEnc.Encode(m) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) - p.processes.errorKernel.errSend(p, m, er) - continue - } - - natsMsgPayloadSerialized = bufGob.Bytes() - } - - // Get the process name so we can look up the process in the - // processes map, and increment the message counter. - pn := processNameGet(p.subject.name(), processKindPublisher) - m.ID = p.messageID - - // The compressed value of the nats message payload. The content - // can either be compressed or in it's original form depening on - // the outcome of the switch below, and if compression were chosen - // or not. - var natsMsgPayloadCompressed []byte - - // Compress the data payload if selected with configuration flag. - // The compression chosen is later set in the nats msg header when - // calling p.messageDeliverNats below. - switch p.configuration.Compression { - case "z": // zstd - natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil) - natsMsgHeader["cmp"] = []string{p.configuration.Compression} - - zEnc.Reset(nil) - - case "g": // gzip - var buf bytes.Buffer - gzipW := gzip.NewWriter(&buf) - _, err := gzipW.Write(natsMsgPayloadSerialized) - if err != nil { - log.Printf("error: failed to write gzip: %v\n", err) - gzipW.Close() - continue - } - gzipW.Close() - - natsMsgPayloadCompressed = buf.Bytes() - natsMsgHeader["cmp"] = []string{p.configuration.Compression} - case "": // no compression - natsMsgPayloadCompressed = natsMsgPayloadSerialized - - default: // no compression - // Allways log the error to console. - er := fmt.Errorf("error: compression type not defined: %v, setting default to zero compression", err) - log.Printf("%v\n", er) - - // We only wan't to send the error message to errorCentral once. - once.Do(func() { - p.processes.errorKernel.errSend(p, m, er) - }) - - natsMsgPayloadCompressed = natsMsgPayloadSerialized - } - - // Create the Nats message with headers and payload, and do the - // sending of the message. - p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m) - - // Signaling back to the ringbuffer that we are done with the - // current message, and it can remove it from the ringbuffer. - m.done <- struct{}{} - - // Increment the counter for the next message to be sent. - p.messageID++ - - { - p.processes.active.mu.Lock() - p.processes.active.procNames[pn] = p - p.processes.active.mu.Unlock() - } - - // Handle the error. - // - // NOTE: None of the processes above generate an error, so the the - // if clause will never be triggered. But keeping it here as an example - // for now for how to handle errors. - if err != nil { - // Create an error type which also creates a channel which the - // errorKernel will send back the action about what to do. - ep := errorEvent{ - //errorType: logOnly, - process: p, - message: m, - errorActionCh: make(chan errorAction), - } - p.errorCh <- ep - - // Wait for the response action back from the error kernel, and - // decide what to do. Should we continue, quit, or .... ? - switch <-ep.errorActionCh { - case errActionContinue: - // Just log and continue - log.Printf("The errAction was continue...so we're continuing\n") - case errActionKill: - log.Printf("The errAction was kill...so we're killing\n") - // .... - default: - log.Printf("Info: publishMessages: The errAction was not defined, so we're doing nothing\n") - } - } } } + +func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, natsConn *nats.Conn) { + // Create the initial header, and set values below depending on the + // various configuration options chosen. + natsMsgHeader := nats.Header{} + + // The serialized value of the nats message payload + var natsMsgPayloadSerialized []byte + + // encode the message structure into gob binary format before putting + // it into a nats message. + // Prepare a gob encoder with a buffer before we start the loop + switch p.configuration.Serialization { + case "cbor": + b, err := cbor.Marshal(m) + if err != nil { + er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) + p.processes.errorKernel.errSend(p, m, er) + return + } + + natsMsgPayloadSerialized = b + natsMsgHeader["serial"] = []string{p.configuration.Serialization} + + default: + var bufGob bytes.Buffer + gobEnc := gob.NewEncoder(&bufGob) + err := gobEnc.Encode(m) + if err != nil { + er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) + p.processes.errorKernel.errSend(p, m, er) + return + } + + natsMsgPayloadSerialized = bufGob.Bytes() + } + + // Get the process name so we can look up the process in the + // processes map, and increment the message counter. + pn := processNameGet(p.subject.name(), processKindPublisher) + m.ID = p.messageID + + // The compressed value of the nats message payload. The content + // can either be compressed or in it's original form depening on + // the outcome of the switch below, and if compression were chosen + // or not. + var natsMsgPayloadCompressed []byte + + // Compress the data payload if selected with configuration flag. + // The compression chosen is later set in the nats msg header when + // calling p.messageDeliverNats below. + switch p.configuration.Compression { + case "z": // zstd + natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil) + natsMsgHeader["cmp"] = []string{p.configuration.Compression} + + zEnc.Reset(nil) + + case "g": // gzip + var buf bytes.Buffer + gzipW := gzip.NewWriter(&buf) + _, err := gzipW.Write(natsMsgPayloadSerialized) + if err != nil { + log.Printf("error: failed to write gzip: %v\n", err) + gzipW.Close() + return + } + gzipW.Close() + + natsMsgPayloadCompressed = buf.Bytes() + natsMsgHeader["cmp"] = []string{p.configuration.Compression} + case "": // no compression + natsMsgPayloadCompressed = natsMsgPayloadSerialized + + default: // no compression + // Allways log the error to console. + er := fmt.Errorf("error: compression type not defined, setting default to zero compression") + log.Printf("%v\n", er) + + // We only wan't to send the error message to errorCentral once. + once.Do(func() { + p.processes.errorKernel.errSend(p, m, er) + }) + + natsMsgPayloadCompressed = natsMsgPayloadSerialized + } + + // Create the Nats message with headers and payload, and do the + // sending of the message. + p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m) + + // Signaling back to the ringbuffer that we are done with the + // current message, and it can remove it from the ringbuffer. + m.done <- struct{}{} + + // Increment the counter for the next message to be sent. + p.messageID++ + + { + p.processes.active.mu.Lock() + p.processes.active.procNames[pn] = p + p.processes.active.mu.Unlock() + } + + // // Handle the error. + // // + // // NOTE: None of the processes above generate an error, so the the + // // if clause will never be triggered. But keeping it here as an example + // // for now for how to handle errors. + // if err != nil { + // // Create an error type which also creates a channel which the + // // errorKernel will send back the action about what to do. + // ep := errorEvent{ + // //errorType: logOnly, + // process: p, + // message: m, + // errorActionCh: make(chan errorAction), + // } + // p.errorCh <- ep + // + // // Wait for the response action back from the error kernel, and + // // decide what to do. Should we continue, quit, or .... ? + // switch <-ep.errorActionCh { + // case errActionContinue: + // // Just log and continue + // log.Printf("The errAction was continue...so we're continuing\n") + // case errActionKill: + // log.Printf("The errAction was kill...so we're killing\n") + // // .... + // default: + // log.Printf("Info: publishMessages: The errAction was not defined, so we're doing nothing\n") + // } + // } +}