mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
removed done channel when publishing, and added make chan when creating sam
This commit is contained in:
parent
2eff3c9cd3
commit
6a97d86ca0
4 changed files with 18 additions and 24 deletions
|
@ -434,9 +434,10 @@ func newSubjectAndMessage(m Message) (subjectAndMessage, error) {
|
|||
}
|
||||
|
||||
sub := Subject{
|
||||
ToNode: string(m.ToNode),
|
||||
Event: tmpH.getKind(),
|
||||
Method: m.Method,
|
||||
ToNode: string(m.ToNode),
|
||||
Event: tmpH.getKind(),
|
||||
Method: m.Method,
|
||||
messageCh: make(chan Message),
|
||||
}
|
||||
|
||||
sam := subjectAndMessage{
|
||||
|
|
14
process.go
14
process.go
|
@ -976,13 +976,13 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
|||
// sending of the message.
|
||||
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m)
|
||||
|
||||
select {
|
||||
case m.done <- struct{}{}:
|
||||
// Signaling back to the ringbuffer that we are done with the
|
||||
// current message, and it can remove it from the ringbuffer.
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
}
|
||||
// select {
|
||||
// case m.done <- struct{}{}:
|
||||
// // Signaling back to the ringbuffer that we are done with the
|
||||
// // current message, and it can remove it from the ringbuffer.
|
||||
// case <-p.ctx.Done():
|
||||
// return
|
||||
// }
|
||||
|
||||
// Increment the counter for the next message to be sent.
|
||||
p.messageID++
|
||||
|
|
|
@ -322,13 +322,13 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
|
|||
// we don't get stuck go routines here.
|
||||
//
|
||||
// TODO: Figure out why what the reason for not receceving the done signals might be.
|
||||
select {
|
||||
case <-v.SAM.done:
|
||||
case <-ticker.C:
|
||||
log.Printf("----------------------------------------------\n")
|
||||
log.Printf("Error: ringBuffer message id: %v, subject: %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.SAM.ID, v.SAM.Subject)
|
||||
log.Printf("----------------------------------------------\n")
|
||||
}
|
||||
// select {
|
||||
// case <-v.SAM.done:
|
||||
// case <-ticker.C:
|
||||
// log.Printf("----------------------------------------------\n")
|
||||
// log.Printf("Error: ringBuffer message id: %v, subject: %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.SAM.ID, v.SAM.Subject)
|
||||
// log.Printf("----------------------------------------------\n")
|
||||
// }
|
||||
// log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
|
||||
r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID))
|
||||
|
||||
|
|
|
@ -461,18 +461,11 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
return
|
||||
}
|
||||
|
||||
// Looping here so we are able to redo the sending
|
||||
// of the last message if a process for the specified subject
|
||||
// is not present. The process will then be created, and
|
||||
// the code will loop back here.
|
||||
|
||||
m := sam.Message
|
||||
|
||||
subjName := sam.Subject.name()
|
||||
pn := processNameGet(subjName, processKindPublisher)
|
||||
|
||||
// Check if there is a map of type map[int]process registered
|
||||
// for the processName, and if it exists then return it.
|
||||
sendOK := func() bool {
|
||||
var ctxCanceled bool
|
||||
|
||||
|
|
Loading…
Reference in a new issue