mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
renamed commandorevent field name in subject struct
This commit is contained in:
parent
d7bee9d361
commit
e7e5726095
9 changed files with 94 additions and 103 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -14,3 +14,4 @@ ships/
|
|||
testing_messages/
|
||||
test.file
|
||||
doc/concept/via/README.md
|
||||
notes.txt
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// NB:
|
||||
// When adding new constants for the Method or CommandOrEvent
|
||||
// When adding new constants for the Method or event
|
||||
// types, make sure to also add them to the map
|
||||
// <Method/CommandOrEvent>Available since the this will be used
|
||||
// <Method/Event>Available since the this will be used
|
||||
// to check if the message values are valid later on.
|
||||
|
||||
package steward
|
||||
|
@ -40,8 +40,8 @@ const (
|
|||
EventNACK Event = "EventNACK"
|
||||
)
|
||||
|
||||
// CommandOrEventAvailable are used for checking if the
|
||||
// commands or events are defined.
|
||||
// EventAvailable are used for checking if the
|
||||
// events are defined.
|
||||
type EventAvailable struct {
|
||||
topics map[Event]struct{}
|
||||
}
|
||||
|
@ -50,10 +50,10 @@ type EventAvailable struct {
|
|||
func (e EventAvailable) CheckIfExists(event Event, subject Subject) bool {
|
||||
_, ok := e.topics[event]
|
||||
if ok {
|
||||
// log.Printf("info: CommandOrEventAvailable.CheckIfExists: command or event found: %v, for %v\n", c, subject.name())
|
||||
// log.Printf("info: EventAvailable.CheckIfExists: command or event found: %v, for %v\n", c, subject.name())
|
||||
return true
|
||||
} else {
|
||||
// log.Printf("error: CommandOrEventAvailable.CheckIfExists: command or event not found: %v, for %v\n", c, subject.name())
|
||||
// log.Printf("error: EventAvailable.CheckIfExists: command or event not found: %v, for %v\n", c, subject.name())
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
[
|
||||
{
|
||||
"directory": "random_text_log",
|
||||
"fileName": "somefile.log",
|
||||
"toNode": "central",
|
||||
"data": ["some message sent from a ship for testing\n"],
|
||||
"commandOrEvent":"EventACK",
|
||||
"method":"REQToFileAppend"
|
||||
}
|
||||
]
|
|
@ -104,7 +104,7 @@ type Subject struct {
|
|||
// node, the name of the node to receive the message.
|
||||
ToNode string `json:"node" yaml:"toNode"`
|
||||
// messageType, command/event
|
||||
CommandOrEvent Event `json:"commandOrEvent" yaml:"commandOrEvent"`
|
||||
Event Event `json:"event" yaml:"event"`
|
||||
// method, what is this message doing, etc. CLICommand, Syslog, etc.
|
||||
Method Method `json:"method" yaml:"method"`
|
||||
// messageCh is used by publisher kind processes to read new messages
|
||||
|
@ -118,19 +118,19 @@ type Subject struct {
|
|||
// all the values given as arguments. It will also create the channel
|
||||
// to receive new messages on the specific subject.
|
||||
func newSubject(method Method, node string) Subject {
|
||||
// Get the CommandOrEvent type for the Method.
|
||||
// Get the Event type for the Method.
|
||||
ma := method.GetMethodsAvailable()
|
||||
mh, ok := ma.Methodhandlers[method]
|
||||
if !ok {
|
||||
log.Printf("error: no CommandOrEvent type specified for the method: %v\n", method)
|
||||
log.Printf("error: no Event type specified for the method: %v\n", method)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return Subject{
|
||||
ToNode: node,
|
||||
CommandOrEvent: mh.getKind(),
|
||||
Method: method,
|
||||
messageCh: make(chan Message),
|
||||
ToNode: node,
|
||||
Event: mh.getKind(),
|
||||
Method: method,
|
||||
messageCh: make(chan Message),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,5 +139,5 @@ type subjectName string
|
|||
|
||||
// Return a value of the subjectName for the subject as used with nats subject.
|
||||
func (s Subject) name() subjectName {
|
||||
return subjectName(fmt.Sprintf("%s.%s.%s", s.ToNode, s.Method, s.CommandOrEvent))
|
||||
return subjectName(fmt.Sprintf("%s.%s.%s", s.ToNode, s.Method, s.Event))
|
||||
}
|
||||
|
|
|
@ -410,9 +410,9 @@ func newSubjectAndMessage(m Message) (subjectAndMessage, error) {
|
|||
}
|
||||
|
||||
sub := Subject{
|
||||
ToNode: string(m.ToNode),
|
||||
CommandOrEvent: tmpH.getKind(),
|
||||
Method: m.Method,
|
||||
ToNode: string(m.ToNode),
|
||||
Event: tmpH.getKind(),
|
||||
Method: m.Method,
|
||||
}
|
||||
|
||||
sam := subjectAndMessage{
|
||||
|
|
18
process.go
18
process.go
|
@ -246,7 +246,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
|
||||
// If it is a NACK message we just deliver the message and return
|
||||
// here so we don't create a ACK message and then stop waiting for it.
|
||||
if p.subject.CommandOrEvent == EventNACK {
|
||||
if p.subject.Event == EventNACK {
|
||||
err := natsConn.PublishMsg(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: nats publish of hello failed: %v", err)
|
||||
|
@ -283,7 +283,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
|
||||
// If the message is an ACK type of message we must check that a
|
||||
// reply, and if it is not we don't wait here at all.
|
||||
if p.subject.CommandOrEvent == EventACK {
|
||||
if p.subject.Event == EventACK {
|
||||
// Wait up until ACKTimeout specified for a reply,
|
||||
// continue and resend if no reply received,
|
||||
// or exit if max retries for the message reached.
|
||||
|
@ -497,12 +497,12 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
// sent from the publisher once, and if it is not delivered it will not be retried.
|
||||
switch {
|
||||
|
||||
// Check for ACK type Commands or Event.
|
||||
case p.subject.CommandOrEvent == EventACK:
|
||||
// Check for ACK type Event.
|
||||
case p.subject.Event == EventACK:
|
||||
// Look up the method handler for the specified method.
|
||||
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||
if !ok {
|
||||
er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.CommandOrEvent)
|
||||
er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event)
|
||||
p.processes.errorKernel.errSend(p, message, er)
|
||||
}
|
||||
|
||||
|
@ -520,11 +520,11 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
// Send a confirmation message back to the publisher
|
||||
natsConn.Publish(msg.Reply, out)
|
||||
|
||||
// Check for NACK type Commands or Event.
|
||||
case p.subject.CommandOrEvent == EventNACK:
|
||||
// Check for NACK type Event.
|
||||
case p.subject.Event == EventNACK:
|
||||
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||
if !ok {
|
||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
|
||||
p.processes.errorKernel.errSend(p, message, er)
|
||||
}
|
||||
|
||||
|
@ -536,7 +536,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
}
|
||||
|
||||
default:
|
||||
er := fmt.Errorf("info: did not find that specific type of command or event: %#v", p.subject.CommandOrEvent)
|
||||
er := fmt.Errorf("info: did not find that specific type of command or event: %#v", p.subject.Event)
|
||||
p.processes.errorKernel.infoSend(p, message, er)
|
||||
|
||||
}
|
||||
|
|
126
requests.go
126
requests.go
|
@ -145,67 +145,67 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
|||
ma := MethodsAvailable{
|
||||
Methodhandlers: map[Method]methodHandler{
|
||||
REQInitial: methodREQInitial{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQOpProcessList: methodREQOpProcessList{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQOpProcessStart: methodREQOpProcessStart{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQOpProcessStop: methodREQOpProcessStop{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQCliCommand: methodREQCliCommand{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQCliCommandCont: methodREQCliCommandCont{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQToConsole: methodREQToConsole{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQTuiToConsole: methodREQTuiToConsole{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQToFileAppend: methodREQToFileAppend{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQToFile: methodREQToFile{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQCopyFileFrom: methodREQCopyFileFrom{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQCopyFileTo: methodREQCopyFileTo{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQHello: methodREQHello{
|
||||
commandOrEvent: EventNACK,
|
||||
event: EventNACK,
|
||||
},
|
||||
REQErrorLog: methodREQErrorLog{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQPing: methodREQPing{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQPong: methodREQPong{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQHttpGet: methodREQHttpGet{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQTailFile: methodREQTailFile{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQToSocket: methodREQToSocket{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQRelay: methodREQRelay{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
REQRelayInitial: methodREQRelayInitial{
|
||||
commandOrEvent: EventACK,
|
||||
event: EventACK,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -249,11 +249,11 @@ func getContextForMethodTimeout(ctx context.Context, message Message) (context.C
|
|||
|
||||
// Initial parent method used to start other processes.
|
||||
type methodREQInitial struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQInitial) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
func (m methodREQInitial) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
|
@ -388,11 +388,11 @@ type methodHandler interface {
|
|||
|
||||
// --- OpProcessList
|
||||
type methodREQOpProcessList struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQOpProcessList) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle Op Process List
|
||||
|
@ -426,11 +426,11 @@ func (m methodREQOpProcessList) handler(proc process, message Message, node stri
|
|||
// --- OpProcessStart
|
||||
|
||||
type methodREQOpProcessStart struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQOpProcessStart) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle Op Process Start
|
||||
|
@ -482,11 +482,11 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
|||
// --- OpProcessStop
|
||||
|
||||
type methodREQOpProcessStop struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQOpProcessStop) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// RecevingNode Node `json:"receivingNode"`
|
||||
|
@ -588,11 +588,11 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
// ----
|
||||
|
||||
type methodREQToFileAppend struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQToFileAppend) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle appending data to file.
|
||||
|
@ -642,11 +642,11 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
// -----
|
||||
|
||||
type methodREQToFile struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQToFile) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||
|
@ -699,11 +699,11 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
// ----
|
||||
|
||||
type methodREQCopyFileFrom struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQCopyFileFrom) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||
|
@ -832,11 +832,11 @@ func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, e
|
|||
// ----
|
||||
|
||||
type methodREQCopyFileTo struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQCopyFileTo) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||
|
@ -947,11 +947,11 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string)
|
|||
|
||||
// ----
|
||||
type methodREQHello struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQHello) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handler for receiving hello messages.
|
||||
|
@ -1001,11 +1001,11 @@ func (m methodREQHello) handler(proc process, message Message, node string) ([]b
|
|||
// ---
|
||||
|
||||
type methodREQErrorLog struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQErrorLog) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle the writing of error logs.
|
||||
|
@ -1050,11 +1050,11 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) (
|
|||
// ---
|
||||
|
||||
type methodREQPing struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQPing) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle receving a ping.
|
||||
|
@ -1114,11 +1114,11 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
// ---
|
||||
|
||||
type methodREQPong struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQPong) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle receiving a pong.
|
||||
|
@ -1171,11 +1171,11 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
// ---
|
||||
|
||||
type methodREQCliCommand struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQCliCommand) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// handler to run a CLI command with timeout context. The handler will
|
||||
|
@ -1288,11 +1288,11 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
// ---
|
||||
|
||||
type methodREQToConsole struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQToConsole) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handler to write directly to console.
|
||||
|
@ -1321,11 +1321,11 @@ func (m methodREQToConsole) handler(proc process, message Message, node string)
|
|||
// ---
|
||||
|
||||
type methodREQTuiToConsole struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQTuiToConsole) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handler to write directly to console.
|
||||
|
@ -1345,11 +1345,11 @@ func (m methodREQTuiToConsole) handler(proc process, message Message, node strin
|
|||
// ---
|
||||
|
||||
type methodREQHttpGet struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQHttpGet) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// handler to do a Http Get.
|
||||
|
@ -1444,11 +1444,11 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
// --- methodREQTailFile
|
||||
|
||||
type methodREQTailFile struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQTailFile) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// handler to run a tailing of files with timeout context. The handler will
|
||||
|
@ -1537,11 +1537,11 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
|||
|
||||
// ---
|
||||
type methodREQCliCommandCont struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQCliCommandCont) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handler to run REQCliCommandCont, which is the same as normal
|
||||
|
@ -1670,11 +1670,11 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
// ---
|
||||
|
||||
type methodREQToSocket struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQToSocket) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// TODO: Not implemented.
|
||||
|
@ -1693,11 +1693,11 @@ func (m methodREQToSocket) handler(proc process, message Message, node string) (
|
|||
// ----
|
||||
|
||||
type methodREQRelayInitial struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQRelayInitial) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handler to relay messages via a host.
|
||||
|
@ -1797,11 +1797,11 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin
|
|||
// ----
|
||||
|
||||
type methodREQRelay struct {
|
||||
commandOrEvent Event
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQRelay) getKind() Event {
|
||||
return m.commandOrEvent
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handler to relay messages via a host.
|
||||
|
|
|
@ -162,9 +162,9 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
|
|||
for {
|
||||
select {
|
||||
case v := <-inCh:
|
||||
// Check if the command or event exists in commandOrEvent.go
|
||||
if !eventAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) {
|
||||
er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: event=%v, with subject=%v", eventAvailableValues, v.CommandOrEvent, v.Subject)
|
||||
// Check if the event exists.
|
||||
if !eventAvailable.CheckIfExists(v.Event, v.Subject) {
|
||||
er := fmt.Errorf("error: fillBuffer: the event type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: event=%v, with subject=%v", eventAvailableValues, v.Event, v.Subject)
|
||||
r.errorKernel.errSend(r.processInitial, Message{}, er)
|
||||
|
||||
// if it was not a valid value, we jump back up, and
|
||||
|
|
|
@ -408,7 +408,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
s.errorKernel.errSend(s.processInitial, sam.Message, er)
|
||||
continue
|
||||
}
|
||||
if !eventAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
|
||||
if !eventAvailable.CheckIfExists(sam.Subject.Event, sam.Subject) {
|
||||
er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
|
||||
s.errorKernel.errSend(s.processInitial, sam.Message, er)
|
||||
|
||||
|
|
Loading…
Reference in a new issue