diff --git a/command_event_type.go b/command_event_type.go index cac69b1..3181dba 100644 --- a/command_event_type.go +++ b/command_event_type.go @@ -6,7 +6,7 @@ package steward -// CommandOrEvent describes on the message level if this is +// Event describes on the message level if this is // an event or command kind of message in the Subject name. // This field is mainly used to be able to spawn up different // worker processes based on the Subject name so we can have @@ -15,15 +15,13 @@ package steward // This type is used in both building the subject name, and // also inside the Message type to describe if it is a Command // or Event. -type CommandOrEvent string +type Event string -func (c CommandOrEvent) GetCommandOrEventAvailable() CommandOrEventAvailable { - ma := CommandOrEventAvailable{ - topics: map[CommandOrEvent]struct{}{ - CommandACK: {}, - CommandNACK: {}, - EventACK: {}, - EventNACK: {}, +func (c Event) CheckEventAvailable() EventAvailable { + ma := EventAvailable{ + topics: map[Event]struct{}{ + EventACK: {}, + EventNACK: {}, }, } @@ -31,36 +29,26 @@ func (c CommandOrEvent) GetCommandOrEventAvailable() CommandOrEventAvailable { } const ( - // Command, command that will just wait for an - // ack, and nothing of the output of the command are - // delivered back in the reply ack message. - // The message should contain the unique ID of the - // command. - CommandACK CommandOrEvent = "CommandACK" + // EventACK, wait for the return of an ACK message. + // The sender will wait for an ACK reply message + // to decide if it was succesfully delivered or not. + // If no ACK was received within the timeout, the + // message will be resent the nr. of times specified + // in retries field of the message. + EventACK Event = "EventACK" // Same as above, but No ACK. - CommandNACK CommandOrEvent = "CommandNACK" - // Same as above, but No ACK - // Event, wait for and return the ACK message. This means - // that the command should be executed immediately - // and that we should get the confirmation if it - // was successful or not. - EventACK CommandOrEvent = "EventACK" - // Same as above, but No ACK. - EventNACK CommandOrEvent = "EventNACK" - // eventCommand, just wait for the ACK that the - // message is received. What action happens is up to the - // received to decide. + EventNACK Event = "EventNACK" ) // CommandOrEventAvailable are used for checking if the // commands or events are defined. -type CommandOrEventAvailable struct { - topics map[CommandOrEvent]struct{} +type EventAvailable struct { + topics map[Event]struct{} } // Check if a command or even exists. -func (co CommandOrEventAvailable) CheckIfExists(c CommandOrEvent, subject Subject) bool { - _, ok := co.topics[c] +func (e EventAvailable) CheckIfExists(event Event, subject Subject) bool { + _, ok := e.topics[event] if ok { // log.Printf("info: CommandOrEventAvailable.CheckIfExists: command or event found: %v, for %v\n", c, subject.name()) return true diff --git a/message_and_subject.go b/message_and_subject.go index e124849..e6e45e2 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -104,7 +104,7 @@ type Subject struct { // node, the name of the node to receive the message. ToNode string `json:"node" yaml:"toNode"` // messageType, command/event - CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"` + CommandOrEvent Event `json:"commandOrEvent" yaml:"commandOrEvent"` // method, what is this message doing, etc. CLICommand, Syslog, etc. Method Method `json:"method" yaml:"method"` // messageCh is used by publisher kind processes to read new messages @@ -120,7 +120,7 @@ type Subject struct { func newSubject(method Method, node string) Subject { // Get the CommandOrEvent type for the Method. ma := method.GetMethodsAvailable() - coe, ok := ma.Methodhandlers[method] + mh, ok := ma.Methodhandlers[method] if !ok { log.Printf("error: no CommandOrEvent type specified for the method: %v\n", method) os.Exit(1) @@ -128,7 +128,7 @@ func newSubject(method Method, node string) Subject { return Subject{ ToNode: node, - CommandOrEvent: coe.getKind(), + CommandOrEvent: mh.getKind(), Method: method, messageCh: make(chan Message), } diff --git a/process.go b/process.go index 0e9a70e..ff456f1 100644 --- a/process.go +++ b/process.go @@ -244,9 +244,9 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He Header: natsMsgHeader, } - // - - if p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK { + // If it is a NACK message we just deliver the message and return + // here so we don't create a ACK message and then stop waiting for it. + if p.subject.CommandOrEvent == EventNACK { err := natsConn.PublishMsg(msg) if err != nil { er := fmt.Errorf("error: nats publish of hello failed: %v", err) @@ -260,7 +260,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // The SubscribeSync used in the subscriber, will get messages that // are sent after it started subscribing. // - // Create a subscriber for the reply message. + // Create a subscriber for the ACK reply message. subReply, err := natsConn.SubscribeSync(msg.Reply) if err != nil { er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) @@ -283,7 +283,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // If the message is an ACK type of message we must check that a // reply, and if it is not we don't wait here at all. - if p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK { + if p.subject.CommandOrEvent == EventACK { // Wait up until ACKTimeout specified for a reply, // continue and resend if no reply received, // or exit if max retries for the message reached. @@ -498,7 +498,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, switch { // Check for ACK type Commands or Event. - case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: + case p.subject.CommandOrEvent == EventACK: // Look up the method handler for the specified method. mh, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { @@ -521,7 +521,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, natsConn.Publish(msg.Reply, out) // Check for NACK type Commands or Event. - case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK: + case p.subject.CommandOrEvent == EventNACK: mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) diff --git a/requests.go b/requests.go index 7de975a..226971e 100644 --- a/requests.go +++ b/requests.go @@ -145,22 +145,22 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { ma := MethodsAvailable{ Methodhandlers: map[Method]methodHandler{ REQInitial: methodREQInitial{ - commandOrEvent: CommandACK, + commandOrEvent: EventACK, }, REQOpProcessList: methodREQOpProcessList{ - commandOrEvent: CommandACK, + commandOrEvent: EventACK, }, REQOpProcessStart: methodREQOpProcessStart{ - commandOrEvent: CommandACK, + commandOrEvent: EventACK, }, REQOpProcessStop: methodREQOpProcessStop{ - commandOrEvent: CommandACK, + commandOrEvent: EventACK, }, REQCliCommand: methodREQCliCommand{ - commandOrEvent: CommandACK, + commandOrEvent: EventACK, }, REQCliCommandCont: methodREQCliCommandCont{ - commandOrEvent: CommandACK, + commandOrEvent: EventACK, }, REQToConsole: methodREQToConsole{ commandOrEvent: EventACK, @@ -249,10 +249,10 @@ func getContextForMethodTimeout(ctx context.Context, message Message) (context.C // Initial parent method used to start other processes. type methodREQInitial struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQInitial) getKind() CommandOrEvent { +func (m methodREQInitial) getKind() Event { return m.commandOrEvent } @@ -381,17 +381,17 @@ func selectFileNaming(message Message, proc process) (string, string) { // The methodHandler interface. type methodHandler interface { handler(proc process, message Message, node string) ([]byte, error) - getKind() CommandOrEvent + getKind() Event } // ----- // --- OpProcessList type methodREQOpProcessList struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQOpProcessList) getKind() CommandOrEvent { +func (m methodREQOpProcessList) getKind() Event { return m.commandOrEvent } @@ -426,10 +426,10 @@ func (m methodREQOpProcessList) handler(proc process, message Message, node stri // --- OpProcessStart type methodREQOpProcessStart struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQOpProcessStart) getKind() CommandOrEvent { +func (m methodREQOpProcessStart) getKind() Event { return m.commandOrEvent } @@ -482,10 +482,10 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str // --- OpProcessStop type methodREQOpProcessStop struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQOpProcessStop) getKind() CommandOrEvent { +func (m methodREQOpProcessStop) getKind() Event { return m.commandOrEvent } @@ -588,10 +588,10 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri // ---- type methodREQToFileAppend struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQToFileAppend) getKind() CommandOrEvent { +func (m methodREQToFileAppend) getKind() Event { return m.commandOrEvent } @@ -642,10 +642,10 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin // ----- type methodREQToFile struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQToFile) getKind() CommandOrEvent { +func (m methodREQToFile) getKind() Event { return m.commandOrEvent } @@ -699,10 +699,10 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([] // ---- type methodREQCopyFileFrom struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQCopyFileFrom) getKind() CommandOrEvent { +func (m methodREQCopyFileFrom) getKind() Event { return m.commandOrEvent } @@ -832,10 +832,10 @@ func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, e // ---- type methodREQCopyFileTo struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQCopyFileTo) getKind() CommandOrEvent { +func (m methodREQCopyFileTo) getKind() Event { return m.commandOrEvent } @@ -947,10 +947,10 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string) // ---- type methodREQHello struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQHello) getKind() CommandOrEvent { +func (m methodREQHello) getKind() Event { return m.commandOrEvent } @@ -1001,10 +1001,10 @@ func (m methodREQHello) handler(proc process, message Message, node string) ([]b // --- type methodREQErrorLog struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQErrorLog) getKind() CommandOrEvent { +func (m methodREQErrorLog) getKind() Event { return m.commandOrEvent } @@ -1050,10 +1050,10 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) ( // --- type methodREQPing struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQPing) getKind() CommandOrEvent { +func (m methodREQPing) getKind() Event { return m.commandOrEvent } @@ -1114,10 +1114,10 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by // --- type methodREQPong struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQPong) getKind() CommandOrEvent { +func (m methodREQPong) getKind() Event { return m.commandOrEvent } @@ -1171,10 +1171,10 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by // --- type methodREQCliCommand struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQCliCommand) getKind() CommandOrEvent { +func (m methodREQCliCommand) getKind() Event { return m.commandOrEvent } @@ -1288,10 +1288,10 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) // --- type methodREQToConsole struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQToConsole) getKind() CommandOrEvent { +func (m methodREQToConsole) getKind() Event { return m.commandOrEvent } @@ -1321,10 +1321,10 @@ func (m methodREQToConsole) handler(proc process, message Message, node string) // --- type methodREQTuiToConsole struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQTuiToConsole) getKind() CommandOrEvent { +func (m methodREQTuiToConsole) getKind() Event { return m.commandOrEvent } @@ -1345,10 +1345,10 @@ func (m methodREQTuiToConsole) handler(proc process, message Message, node strin // --- type methodREQHttpGet struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQHttpGet) getKind() CommandOrEvent { +func (m methodREQHttpGet) getKind() Event { return m.commandOrEvent } @@ -1444,10 +1444,10 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ // --- methodREQTailFile type methodREQTailFile struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQTailFile) getKind() CommandOrEvent { +func (m methodREQTailFile) getKind() Event { return m.commandOrEvent } @@ -1537,10 +1537,10 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( // --- type methodREQCliCommandCont struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQCliCommandCont) getKind() CommandOrEvent { +func (m methodREQCliCommandCont) getKind() Event { return m.commandOrEvent } @@ -1670,10 +1670,10 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str // --- type methodREQToSocket struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQToSocket) getKind() CommandOrEvent { +func (m methodREQToSocket) getKind() Event { return m.commandOrEvent } @@ -1693,10 +1693,10 @@ func (m methodREQToSocket) handler(proc process, message Message, node string) ( // ---- type methodREQRelayInitial struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQRelayInitial) getKind() CommandOrEvent { +func (m methodREQRelayInitial) getKind() Event { return m.commandOrEvent } @@ -1797,10 +1797,10 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin // ---- type methodREQRelay struct { - commandOrEvent CommandOrEvent + commandOrEvent Event } -func (m methodREQRelay) getKind() CommandOrEvent { +func (m methodREQRelay) getKind() Event { return m.commandOrEvent } diff --git a/ringbuffer.go b/ringbuffer.go index 88eeabf..96ae2d4 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -149,12 +149,12 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage }() // Prepare the map structure to know what values are allowed - // for the commands or events - var coe CommandOrEvent - coeAvailable := coe.GetCommandOrEventAvailable() - coeAvailableValues := []CommandOrEvent{} - for v := range coeAvailable.topics { - coeAvailableValues = append(coeAvailableValues, v) + // for the events + var event Event + eventAvailable := event.CheckEventAvailable() + eventAvailableValues := []Event{} + for v := range eventAvailable.topics { + eventAvailableValues = append(eventAvailableValues, v) } // Check for incomming messages. These are typically comming from @@ -163,8 +163,8 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage select { case v := <-inCh: // Check if the command or event exists in commandOrEvent.go - if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) { - er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject) + if !eventAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) { + er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: event=%v, with subject=%v", eventAvailableValues, v.CommandOrEvent, v.Subject) r.errorKernel.errSend(r.processInitial, Message{}, er) // if it was not a valid value, we jump back up, and diff --git a/server.go b/server.go index 1e5fa67..57851a7 100644 --- a/server.go +++ b/server.go @@ -390,8 +390,8 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // send if there are a specific subject for it, and if no subject // exist throw an error. - var coe CommandOrEvent - coeAvailable := coe.GetCommandOrEventAvailable() + var event Event + eventAvailable := event.CheckEventAvailable() var method Method methodsAvailable := method.GetMethodsAvailable() @@ -408,7 +408,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) { s.errorKernel.errSend(s.processInitial, sam.Message, er) continue } - if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { + if !eventAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method) s.errorKernel.errSend(s.processInitial, sam.Message, er)