1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

initially removed dependency to NACK/ACK event type

This commit is contained in:
postmannen 2023-01-05 01:55:52 +01:00
parent 4fe96b645e
commit eced76a85c
7 changed files with 35 additions and 80 deletions

View file

@ -71,9 +71,7 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error
er := fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err) er := fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err)
sam := subjectAndMessage{ m := Message{
Subject: newSubject(REQErrorLog, "errorCentral"),
Message: Message{
Directory: "errorLog", Directory: "errorLog",
ToNode: "errorCentral", ToNode: "errorCentral",
FromNode: errEvent.process.node, FromNode: errEvent.process.node,
@ -82,7 +80,11 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error
Method: REQErrorLog, Method: REQErrorLog,
ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout, ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout,
Retries: errEvent.process.configuration.ErrorMessageRetries, Retries: errEvent.process.configuration.ErrorMessageRetries,
}, }
sam := subjectAndMessage{
Subject: newSubject(REQErrorLog, "errorCentral"),
Message: m,
} }
// Put the message on the channel to the ringbuffer. // Put the message on the channel to the ringbuffer.

View file

@ -15,17 +15,6 @@ package steward
// ACK or NACK it is. // ACK or NACK it is.
type Event string type Event string
func (c Event) CheckEventAvailable() EventAvailable {
ma := EventAvailable{
topics: map[Event]struct{}{
EventACK: {},
EventNACK: {},
},
}
return ma
}
const ( const (
// EventACK, wait for the return of an ACK message. // EventACK, wait for the return of an ACK message.
// The sender will wait for an ACK reply message // The sender will wait for an ACK reply message
@ -37,21 +26,3 @@ const (
// Same as above, but No ACK. // Same as above, but No ACK.
EventNACK Event = "EventNACK" EventNACK Event = "EventNACK"
) )
// EventAvailable are used for checking if the
// events are defined.
type EventAvailable struct {
topics map[Event]struct{}
}
// Check if an event exists.
func (e EventAvailable) CheckIfExists(event Event, subject Subject) bool {
_, ok := e.topics[event]
if ok {
// log.Printf("info: EventAvailable.CheckIfExists: event found: %v, for %v\n", c, subject.name())
return true
} else {
// log.Printf("error: EventAvailable.CheckIfExists: event not found: %v, for %v\n", c, subject.name())
return false
}
}

View file

@ -295,7 +295,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
switch { switch {
// If it is a NACK message we just deliver the message and return // If it is a NACK message we just deliver the message and return
// here so we don't create a ACK message and then stop waiting for it. // here so we don't create a ACK message and then stop waiting for it.
case p.subject.Event == EventNACK: case message.ACKTimeout < 1:
err = func() error { err = func() error {
err := natsConn.PublishMsg(msg) err := natsConn.PublishMsg(msg)
if err != nil { if err != nil {
@ -318,7 +318,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
return nil return nil
}() }()
case p.subject.Event == EventACK: case message.ACKTimeout >= 1:
// The function below will return nil if the message should not be retried. // The function below will return nil if the message should not be retried.
// //
// All other errors happening will return ErrACKSubscribeRetry which will lead // All other errors happening will return ErrACKSubscribeRetry which will lead
@ -559,7 +559,8 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
switch { switch {
// Check for ACK type Event. // Check for ACK type Event.
case p.subject.Event == EventACK: case message.ACKTimeout >= 1:
fmt.Println("-------------------- subscriberHandler ACK-----------------------")
// When spawning sub processes we can directly assign handlers to the process upon // When spawning sub processes we can directly assign handlers to the process upon
// creation. We here check if a handler is already assigned, and if it is nil, we // creation. We here check if a handler is already assigned, and if it is nil, we
// lookup and find the correct handler to use if available. // lookup and find the correct handler to use if available.
@ -582,7 +583,8 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
// no matter if the handler was executed successfully or not // no matter if the handler was executed successfully or not
natsConn.Publish(msg.Reply, []byte{}) natsConn.Publish(msg.Reply, []byte{})
case p.subject.Event == EventNACK: case message.ACKTimeout < 1:
fmt.Println("-------------------- subscriberHandler NACK-----------------------")
// When spawning sub processes we can directly assign handlers to the process upon // When spawning sub processes we can directly assign handlers to the process upon
// creation. We here check if a handler is already assigned, and if it is nil, we // creation. We here check if a handler is already assigned, and if it is nil, we
// lookup and find the correct handler to use if available. // lookup and find the correct handler to use if available.

View file

@ -300,7 +300,7 @@ func (s startup) pubREQHello(p process) {
FromNode: Node(p.node), FromNode: Node(p.node),
Data: []byte(d), Data: []byte(d),
Method: REQHello, Method: REQHello,
ACKTimeout: 10, ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1, Retries: 1,
} }

View file

@ -356,6 +356,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
ToNode: n, ToNode: n,
Method: REQKeysDeliverUpdate, Method: REQKeysDeliverUpdate,
ReplyMethod: REQNone, ReplyMethod: REQNone,
ACKTimeout: 0,
} }
sam, err := newSubjectAndMessage(msg) sam, err := newSubjectAndMessage(msg)
@ -402,6 +403,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
Method: REQKeysDeliverUpdate, Method: REQKeysDeliverUpdate,
Data: b, Data: b,
ReplyMethod: REQNone, ReplyMethod: REQNone,
ACKTimeout: 0,
} }
sam, err := newSubjectAndMessage(msg) sam, err := newSubjectAndMessage(msg)

View file

@ -162,35 +162,22 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
}() }()
} }
// Prepare the map structure to know what values are allowed
// for the events
var event Event
eventAvailable := event.CheckEventAvailable()
eventAvailableValues := []Event{}
for v := range eventAvailable.topics {
eventAvailableValues = append(eventAvailableValues, v)
}
// Check for incomming messages. These are typically comming from // Check for incomming messages. These are typically comming from
// the go routine who reads the socket. // the go routine who reads the socket.
for { for {
select { select {
case sam := <-inCh: case sam := <-inCh:
// Check if the event exists.
if !eventAvailable.CheckIfExists(sam.Event, sam.Subject) {
er := fmt.Errorf("error: fillBuffer: the event type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: event=%v, with subject=%v", eventAvailableValues, sam.Event, sam.Subject)
r.errorKernel.errSend(r.processInitial, Message{}, er)
// if it was not a valid value, we jump back up, and
// continue the range iteration.
continue
}
// Check if default message values for timers are set, and if // Check if default message values for timers are set, and if
// not then set default message values. // not then set default message values.
if sam.Message.ACKTimeout < 1 { if sam.Message.ACKTimeout < 1 {
sam.Message.ACKTimeout = r.configuration.DefaultMessageTimeout sam.Subject.Event = EventNACK
} }
if sam.Message.ACKTimeout >= 1 {
sam.Subject.Event = EventNACK
}
// TODO: Make so 0 is an usable option for retries.
if sam.Message.Retries < 1 { if sam.Message.Retries < 1 {
sam.Message.Retries = r.configuration.DefaultMessageRetries sam.Message.Retries = r.configuration.DefaultMessageRetries
} }
@ -258,11 +245,11 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData))) r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData)))
samDBv.SAM.ID = samDBv.ID samDBv.SAM.ID = samDBv.ID
// Create a done channel per message. A process started by the // // Create a done channel per message. A process started by the
// spawnProcess function will handle incomming messages sequentaly. // // spawnProcess function will handle incomming messages sequentaly.
// So in the spawnProcess function we put a struct{} value when a // // So in the spawnProcess function we put a struct{} value when a
// message is processed on the "done" channel and an ack is received // // message is processed on the "done" channel and an ack is received
// for a message, and we wait here for the "done" to be received. // // for a message, and we wait here for the "done" to be received.
// We start the actual processing of an individual message here within // We start the actual processing of an individual message here within
// it's own go routine. Reason is that we don't want to block other // it's own go routine. Reason is that we don't want to block other
@ -294,8 +281,8 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
}, },
} }
ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * 2 * time.Second) // ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * 2 * time.Second)
defer ticker.Stop() // defer ticker.Stop()
outCh <- sd outCh <- sd
// Just to confirm here that the message was picked up, to know if the // Just to confirm here that the message was picked up, to know if the

View file

@ -435,9 +435,6 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// send if there are a specific subject for it, and if no subject // send if there are a specific subject for it, and if no subject
// exist throw an error. // exist throw an error.
var event Event
eventAvailable := event.CheckEventAvailable()
var method Method var method Method
methodsAvailable := method.GetMethodsAvailable() methodsAvailable := method.GetMethodsAvailable()
@ -454,12 +451,6 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
s.errorKernel.errSend(s.processInitial, sam.Message, er) s.errorKernel.errSend(s.processInitial, sam.Message, er)
return return
} }
if !eventAvailable.CheckIfExists(sam.Subject.Event, sam.Subject) {
er := fmt.Errorf("error: routeMessagesToProcess: the event type do not exist, message dropped: %v", sam.Message.Method)
s.errorKernel.errSend(s.processInitial, sam.Message, er)
return
}
m := sam.Message m := sam.Message