diff --git a/process.go b/process.go index ed45321..7c67a38 100644 --- a/process.go +++ b/process.go @@ -287,7 +287,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He Header: natsMsgHeader, } - er := fmt.Errorf("info: preparing to send nats message with subject %v ", msg.Subject) + er := fmt.Errorf("info: preparing to send nats message with subject %v, id: %v", msg.Subject, message.ID) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) var err error @@ -305,7 +305,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He } p.metrics.promNatsDeliveredTotal.Inc() - er := fmt.Errorf("info: sent nats message with subject %v ", msg.Subject) + er := fmt.Errorf("info: sent nats message with subject %v, id: %v", msg.Subject, message.ID) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) //err = natsConn.Flush() @@ -430,7 +430,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // Message were delivered successfully. p.metrics.promNatsDeliveredTotal.Inc() - er = fmt.Errorf("info: sent nats message with subject %v ", msg.Subject) + er = fmt.Errorf("info: sent nats message with subject %v, id: %v", msg.Subject, message.ID) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) return @@ -906,7 +906,11 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, // Get the process name so we can look up the process in the // processes map, and increment the message counter. pn := processNameGet(p.subject.name(), processKindPublisher) - m.ID = p.messageID + + // NB: REMOVED: It doesn't really make sense to get the message id + // from the process. Implemented so this is picked up from the id + // used in the ringbuffer. + // m.ID = p.messageID // The compressed value of the nats message payload. The content // can either be compressed or in it's original form depening on diff --git a/ringbuffer.go b/ringbuffer.go index 9550867..fb8401a 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -256,6 +256,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB select { case v := <-r.bufData: r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData))) + v.Data.ID = v.ID // Create a done channel per message. A process started by the // spawnProcess function will handle incomming messages sequentaly.