diff --git a/errorkernel.go b/errorkernel.go index 93bd6bc..9fe2d83 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -71,18 +71,20 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error er := fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err) + m := Message{ + Directory: "errorLog", + ToNode: "errorCentral", + FromNode: errEvent.process.node, + FileName: "error.log", + Data: []byte(er), + Method: REQErrorLog, + ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout, + Retries: errEvent.process.configuration.ErrorMessageRetries, + } + sam := subjectAndMessage{ Subject: newSubject(REQErrorLog, "errorCentral"), - Message: Message{ - Directory: "errorLog", - ToNode: "errorCentral", - FromNode: errEvent.process.node, - FileName: "error.log", - Data: []byte(er), - Method: REQErrorLog, - ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout, - Retries: errEvent.process.configuration.ErrorMessageRetries, - }, + Message: m, } // Put the message on the channel to the ringbuffer. diff --git a/event_type.go b/event_type.go index 34392d2..543123f 100644 --- a/event_type.go +++ b/event_type.go @@ -15,17 +15,6 @@ package steward // ACK or NACK it is. type Event string -func (c Event) CheckEventAvailable() EventAvailable { - ma := EventAvailable{ - topics: map[Event]struct{}{ - EventACK: {}, - EventNACK: {}, - }, - } - - return ma -} - const ( // EventACK, wait for the return of an ACK message. // The sender will wait for an ACK reply message @@ -37,21 +26,3 @@ const ( // Same as above, but No ACK. EventNACK Event = "EventNACK" ) - -// EventAvailable are used for checking if the -// events are defined. -type EventAvailable struct { - topics map[Event]struct{} -} - -// Check if an event exists. -func (e EventAvailable) CheckIfExists(event Event, subject Subject) bool { - _, ok := e.topics[event] - if ok { - // log.Printf("info: EventAvailable.CheckIfExists: event found: %v, for %v\n", c, subject.name()) - return true - } else { - // log.Printf("error: EventAvailable.CheckIfExists: event not found: %v, for %v\n", c, subject.name()) - return false - } -} diff --git a/process.go b/process.go index 1d2b920..c115855 100644 --- a/process.go +++ b/process.go @@ -295,7 +295,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He switch { // If it is a NACK message we just deliver the message and return // here so we don't create a ACK message and then stop waiting for it. - case p.subject.Event == EventNACK: + case message.ACKTimeout < 1: err = func() error { err := natsConn.PublishMsg(msg) if err != nil { @@ -318,7 +318,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He return nil }() - case p.subject.Event == EventACK: + case message.ACKTimeout >= 1: // The function below will return nil if the message should not be retried. // // All other errors happening will return ErrACKSubscribeRetry which will lead @@ -559,7 +559,8 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, switch { // Check for ACK type Event. - case p.subject.Event == EventACK: + case message.ACKTimeout >= 1: + fmt.Println("-------------------- subscriberHandler ACK-----------------------") // When spawning sub processes we can directly assign handlers to the process upon // creation. We here check if a handler is already assigned, and if it is nil, we // lookup and find the correct handler to use if available. @@ -579,10 +580,11 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // Send a confirmation message back to the publisher to ACK that the // message was received by the subscriber. The reply should be sent - //no matter if the handler was executed successfully or not + // no matter if the handler was executed successfully or not natsConn.Publish(msg.Reply, []byte{}) - case p.subject.Event == EventNACK: + case message.ACKTimeout < 1: + fmt.Println("-------------------- subscriberHandler NACK-----------------------") // When spawning sub processes we can directly assign handlers to the process upon // creation. We here check if a handler is already assigned, and if it is nil, we // lookup and find the correct handler to use if available. diff --git a/processes.go b/processes.go index 3329bc3..60ba8a0 100644 --- a/processes.go +++ b/processes.go @@ -300,7 +300,7 @@ func (s startup) pubREQHello(p process) { FromNode: Node(p.node), Data: []byte(d), Method: REQHello, - ACKTimeout: 10, + ACKTimeout: proc.configuration.DefaultMessageTimeout, Retries: 1, } diff --git a/requests_keys.go b/requests_keys.go index e1c55ad..d9eaf2c 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -356,6 +356,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { ToNode: n, Method: REQKeysDeliverUpdate, ReplyMethod: REQNone, + ACKTimeout: 0, } sam, err := newSubjectAndMessage(msg) @@ -402,6 +403,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { Method: REQKeysDeliverUpdate, Data: b, ReplyMethod: REQNone, + ACKTimeout: 0, } sam, err := newSubjectAndMessage(msg) diff --git a/ringbuffer.go b/ringbuffer.go index d67ebd6..df5ab6c 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -162,35 +162,22 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage }() } - // Prepare the map structure to know what values are allowed - // for the events - var event Event - eventAvailable := event.CheckEventAvailable() - eventAvailableValues := []Event{} - for v := range eventAvailable.topics { - eventAvailableValues = append(eventAvailableValues, v) - } - // Check for incomming messages. These are typically comming from // the go routine who reads the socket. for { select { case sam := <-inCh: - // Check if the event exists. - if !eventAvailable.CheckIfExists(sam.Event, sam.Subject) { - er := fmt.Errorf("error: fillBuffer: the event type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: event=%v, with subject=%v", eventAvailableValues, sam.Event, sam.Subject) - r.errorKernel.errSend(r.processInitial, Message{}, er) - - // if it was not a valid value, we jump back up, and - // continue the range iteration. - continue - } // Check if default message values for timers are set, and if // not then set default message values. if sam.Message.ACKTimeout < 1 { - sam.Message.ACKTimeout = r.configuration.DefaultMessageTimeout + sam.Subject.Event = EventNACK } + if sam.Message.ACKTimeout >= 1 { + sam.Subject.Event = EventNACK + } + + // TODO: Make so 0 is an usable option for retries. if sam.Message.Retries < 1 { sam.Message.Retries = r.configuration.DefaultMessageRetries } @@ -258,11 +245,11 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData))) samDBv.SAM.ID = samDBv.ID - // Create a done channel per message. A process started by the - // spawnProcess function will handle incomming messages sequentaly. - // So in the spawnProcess function we put a struct{} value when a - // message is processed on the "done" channel and an ack is received - // for a message, and we wait here for the "done" to be received. + // // Create a done channel per message. A process started by the + // // spawnProcess function will handle incomming messages sequentaly. + // // So in the spawnProcess function we put a struct{} value when a + // // message is processed on the "done" channel and an ack is received + // // for a message, and we wait here for the "done" to be received. // We start the actual processing of an individual message here within // it's own go routine. Reason is that we don't want to block other @@ -294,8 +281,8 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB }, } - ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * 2 * time.Second) - defer ticker.Stop() + // ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * 2 * time.Second) + // defer ticker.Stop() outCh <- sd // Just to confirm here that the message was picked up, to know if the diff --git a/server.go b/server.go index ef186e2..76cfdc6 100644 --- a/server.go +++ b/server.go @@ -435,9 +435,6 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // send if there are a specific subject for it, and if no subject // exist throw an error. - var event Event - eventAvailable := event.CheckEventAvailable() - var method Method methodsAvailable := method.GetMethodsAvailable() @@ -454,12 +451,6 @@ func (s *server) routeMessagesToProcess(dbFileName string) { s.errorKernel.errSend(s.processInitial, sam.Message, er) return } - if !eventAvailable.CheckIfExists(sam.Subject.Event, sam.Subject) { - er := fmt.Errorf("error: routeMessagesToProcess: the event type do not exist, message dropped: %v", sam.Message.Method) - s.errorKernel.errSend(s.processInitial, sam.Message, er) - - return - } m := sam.Message