1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

using the same message ID locally as in the ringbuffer

This commit is contained in:
postmannen 2022-12-31 07:31:18 +01:00
parent 84284eeee1
commit 5865ad3701
2 changed files with 9 additions and 4 deletions

View file

@ -287,7 +287,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
Header: natsMsgHeader, Header: natsMsgHeader,
} }
er := fmt.Errorf("info: preparing to send nats message with subject %v ", msg.Subject) er := fmt.Errorf("info: preparing to send nats message with subject %v, id: %v", msg.Subject, message.ID)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
var err error var err error
@ -305,7 +305,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
} }
p.metrics.promNatsDeliveredTotal.Inc() p.metrics.promNatsDeliveredTotal.Inc()
er := fmt.Errorf("info: sent nats message with subject %v ", msg.Subject) er := fmt.Errorf("info: sent nats message with subject %v, id: %v", msg.Subject, message.ID)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
//err = natsConn.Flush() //err = natsConn.Flush()
@ -430,7 +430,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// Message were delivered successfully. // Message were delivered successfully.
p.metrics.promNatsDeliveredTotal.Inc() p.metrics.promNatsDeliveredTotal.Inc()
er = fmt.Errorf("info: sent nats message with subject %v ", msg.Subject) er = fmt.Errorf("info: sent nats message with subject %v, id: %v", msg.Subject, message.ID)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
return return
@ -906,7 +906,11 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
// Get the process name so we can look up the process in the // Get the process name so we can look up the process in the
// processes map, and increment the message counter. // processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher) pn := processNameGet(p.subject.name(), processKindPublisher)
m.ID = p.messageID
// NB: REMOVED: It doesn't really make sense to get the message id
// from the process. Implemented so this is picked up from the id
// used in the ringbuffer.
// m.ID = p.messageID
// The compressed value of the nats message payload. The content // The compressed value of the nats message payload. The content
// can either be compressed or in it's original form depening on // can either be compressed or in it's original form depening on

View file

@ -256,6 +256,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
select { select {
case v := <-r.bufData: case v := <-r.bufData:
r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData))) r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData)))
v.Data.ID = v.ID
// Create a done channel per message. A process started by the // Create a done channel per message. A process started by the
// spawnProcess function will handle incomming messages sequentaly. // spawnProcess function will handle incomming messages sequentaly.