diff --git a/cmd/stew/main.go b/cmd/stew/main.go index ee94751..ebe124e 100644 --- a/cmd/stew/main.go +++ b/cmd/stew/main.go @@ -3,7 +3,7 @@ package main import ( "log" - "github.com/RaaLabs/steward" + steward "github.com/RaaLabs/steward/stew" ) func main() { diff --git a/configuration_flags.go b/configuration_flags.go index a5c4585..8cb6cbe 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -24,7 +24,7 @@ import ( type flagNodeSlice struct { value string OK bool - Values []node + Values []Node } // String method @@ -60,17 +60,17 @@ func (f *flagNodeSlice) Parse() error { if split[0] == "RST" { f.OK = false f.value = "" - f.Values = []node{} + f.Values = []Node{} return nil } fv := f.value sp := strings.Split(fv, ",") f.OK = true - f.Values = []node{} + f.Values = []Node{} for _, v := range sp { - f.Values = append(f.Values, node(v)) + f.Values = append(f.Values, Node(v)) } return nil } @@ -155,17 +155,17 @@ func newConfigurationDefaults() Configuration { CentralNodeName: "", RootCAPath: "", NkeySeedFile: "", - StartSubREQErrorLog: flagNodeSlice{Values: []node{}}, - StartSubREQHello: flagNodeSlice{OK: true, Values: []node{"*"}}, - StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []node{"*"}}, - StartSubREQToFile: flagNodeSlice{OK: true, Values: []node{"*"}}, - StartSubREQPing: flagNodeSlice{OK: true, Values: []node{"*"}}, - StartSubREQPong: flagNodeSlice{OK: true, Values: []node{"*"}}, - StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}}, - StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}}, - StartSubREQToConsole: flagNodeSlice{OK: true, Values: []node{"*"}}, - StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []node{"*"}}, - StartSubREQTailFile: flagNodeSlice{OK: true, Values: []node{"*"}}, + StartSubREQErrorLog: flagNodeSlice{Values: []Node{}}, + StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQPing: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQPong: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, } return c } diff --git a/message_and_subject.go b/message_and_subject.go index aa640cb..99a8d71 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -13,7 +13,7 @@ import ( type Message struct { // The node to send the message to - ToNode node `json:"toNode" yaml:"toNode"` + ToNode Node `json:"toNode" yaml:"toNode"` // The Unique ID of the message ID int `json:"id" yaml:"id"` // The actual data in the message @@ -25,7 +25,7 @@ type Message struct { // you can override it setting your own here. ReplyMethod Method `json:"replyMethod" yaml:"replyMethod"` // From what node the message originated - FromNode node + FromNode Node // ACKTimeout for waiting for an ack message ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"` // Resend retries @@ -83,7 +83,7 @@ func gobEncodeMessage(m Message) ([]byte, error) { // --- Subject -type node string +type Node string // subject contains the representation of a subject to be used with one // specific process @@ -107,7 +107,7 @@ type Subject struct { func newSubject(method Method, node string) Subject { // Get the CommandOrEvent type for the Method. ma := method.GetMethodsAvailable() - coe, ok := ma.methodhandlers[method] + coe, ok := ma.Methodhandlers[method] if !ok { log.Printf("error: no CommandOrEvent type specified for the method: %v\n", method) os.Exit(1) diff --git a/process.go b/process.go index b7219eb..040e2cc 100644 --- a/process.go +++ b/process.go @@ -31,7 +31,7 @@ type process struct { subject Subject // Put a node here to be able know the node a process is at. // NB: Might not be needed later on. - node node + node Node // The processID for the current process processID int // errorCh is the same channel the errorKernel uses to @@ -42,7 +42,7 @@ type process struct { errorCh chan errProcess processKind processKind // Who are we allowed to receive from ? - allowedReceivers map[node]struct{} + allowedReceivers map[Node]struct{} // methodsAvailable methodsAvailable MethodsAvailable // Helper or service function that can do some kind of work @@ -78,12 +78,12 @@ type process struct { // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process { +func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []Node, procFunc func() error) process { // create the initial configuration for a sessions communicating with 1 host process. processes.lastProcessID++ // make the slice of allowedReceivers into a map value for easy lookup. - m := make(map[node]struct{}) + m := make(map[Node]struct{}) for _, a := range allowedReceivers { m[a] = struct{}{} } @@ -95,7 +95,7 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- proc := process{ messageID: 0, subject: subject, - node: node(configuration.NodeName), + node: Node(configuration.NodeName), processID: processes.lastProcessID, errorCh: errCh, processKind: processKind, @@ -158,7 +158,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { err := p.procFunc(p.ctx) if err != nil { er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) - sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) } }() } @@ -178,7 +178,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { err := p.procFunc(p.ctx) if err != nil { er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) - sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) } }() } @@ -211,7 +211,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { dataPayload, err := gobEncodeMessage(message) if err != nil { er := fmt.Errorf("error: createDataPayload: %v", err) - sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) continue } @@ -305,7 +305,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na err := gobDec.Decode(&message) if err != nil { er := fmt.Errorf("error: gob decoding failed: %v", err) - sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } switch { @@ -313,7 +313,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na mh, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) - sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } out := []byte("not allowed from " + message.FromNode) @@ -332,11 +332,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } } else { er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) - sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } // Send a confirmation message back to the publisher @@ -346,7 +346,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) - sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } // Check if we are allowed to receive from that host @@ -366,16 +366,16 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } } else { er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) - sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } // --- default: er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent) - sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er) } } @@ -413,7 +413,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { case m = <-p.subject.messageCh: case <-p.ctx.Done(): er := fmt.Errorf("info: canceling publisher: %v", p.subject.name()) - sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) return } // Get the process name so we can look up the process in the diff --git a/read_socket.go b/read_socket.go index be22b90..64dadd4 100644 --- a/read_socket.go +++ b/read_socket.go @@ -17,14 +17,14 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { conn, err := s.netListener.Accept() if err != nil { er := fmt.Errorf("error: failed to accept conn on socket: %v", err) - sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er) + sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) } b := make([]byte, 65535) _, err = conn.Read(b) if err != nil { er := fmt.Errorf("error: failed to read data from socket: %v", err) - sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er) + sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) continue } @@ -34,7 +34,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { sam, err := convertBytesToSAM(b) if err != nil { er := fmt.Errorf("error: malformed json: %v", err) - sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er) + sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) continue } @@ -42,7 +42,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { // Fill in the value for the FromNode field, so the receiver // can check this field to know where it came from. - sam[i].Message.FromNode = node(s.nodeName) + sam[i].Message.FromNode = Node(s.nodeName) } // Send the SAM struct to be picked up by the ring buffer. diff --git a/ringbuffer.go b/ringbuffer.go index 14679ce..86459e5 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -38,12 +38,12 @@ type ringBuffer struct { totalMessagesIndex int mu sync.Mutex permStore chan string - nodeName node + nodeName Node newMessagesCh chan []subjectAndMessage } // newringBuffer is a push/pop storage for values. -func newringBuffer(c Configuration, size int, dbFileName string, nodeName node, newMessagesCh chan []subjectAndMessage) *ringBuffer { +func newringBuffer(c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer { // --- // Check if socket folder exists, if not create it if _, err := os.Stat(c.DatabaseFolder); os.IsNotExist(err) { @@ -134,7 +134,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri // Check if the command or event exists in commandOrEvent.go if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) { er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject) - sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er) + sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) fmt.Println() // if it was not a valid value, we jump back up, and @@ -167,14 +167,14 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri js, err := json.Marshal(samV) if err != nil { er := fmt.Errorf("error:fillBuffer gob encoding samValue: %v", err) - sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er) + sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) } // Store the incomming message in key/value store err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js) if err != nil { er := fmt.Errorf("error: dbUpdate samValue failed: %v", err) - sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er) + sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er) } diff --git a/server.go b/server.go index 41b711e..1564cda 100644 --- a/server.go +++ b/server.go @@ -203,7 +203,7 @@ func (s *server) Start() { // processes are tied to the process struct, we need to create an // initial process to start the rest. sub := newSubject(REQInitial, s.nodeName) - p := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []node{}, nil) + p := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil) p.ProcessesStart() time.Sleep(time.Second * 1) @@ -234,7 +234,7 @@ func (p *processes) printProcessesMap() { // sendErrorMessage will put the error message directly on the channel that is // read by the nats publishing functions. -func sendErrorLogMessage(newMessagesCh chan<- []subjectAndMessage, FromNode node, theError error) { +func sendErrorLogMessage(newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) { // NB: Adding log statement here for more visuality during development. log.Printf("%v\n", theError) sam := createErrorMsgContent(FromNode, theError) @@ -243,7 +243,7 @@ func sendErrorLogMessage(newMessagesCh chan<- []subjectAndMessage, FromNode node // createErrorMsgContent will prepare a subject and message with the content // of the error -func createErrorMsgContent(FromNode node, theError error) subjectAndMessage { +func createErrorMsgContent(FromNode Node, theError error) subjectAndMessage { // Add time stamp er := fmt.Sprintf("%v, %v\n", time.Now().UTC(), theError.Error()) @@ -275,7 +275,7 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage { func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) { // Prepare and start a new ring buffer const bufferSize int = 1000 - rb := newringBuffer(*s.configuration, bufferSize, dbFileName, node(s.nodeName), s.toRingbufferCh) + rb := newringBuffer(*s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingbufferCh) inCh := make(chan subjectAndMessage) ringBufferOutCh := make(chan samDBValue) // start the ringbuffer. @@ -308,12 +308,12 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject // Check if the format of the message is correct. if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) - sendErrorLogMessage(s.toRingbufferCh, node(s.nodeName), er) + sendErrorLogMessage(s.toRingbufferCh, Node(s.nodeName), er) continue } if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method) - sendErrorLogMessage(s.toRingbufferCh, node(s.nodeName), er) + sendErrorLogMessage(s.toRingbufferCh, Node(s.nodeName), er) continue } diff --git a/startup_processes.go b/startup_processes.go index 71aeb77..8c470c7 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -18,7 +18,7 @@ func (p process) ProcessesStart() { { log.Printf("Starting REQOpCommand subscriber: %#v\n", p.node) sub := newSubject(REQOpCommand, string(p.node)) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []node{node(p.configuration.CentralNodeName)}, nil) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{Node(p.configuration.CentralNodeName)}, nil) go proc.spawnWorker(p.processes, p.natsConn) } @@ -99,7 +99,7 @@ func (s startup) pubREQHello(p process) { log.Printf("Starting Hello Publisher: %#v\n", p.node) sub := newSubject(REQHello, p.configuration.CentralNodeName) - proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []node{}, nil) + proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []Node{}, nil) // Define the procFunc to be used for the process. proc.procFunc = procFunc( @@ -111,8 +111,8 @@ func (s startup) pubREQHello(p process) { d := fmt.Sprintf("Hello from %v\n", p.node) m := Message{ - ToNode: node(p.configuration.CentralNodeName), - FromNode: node(p.node), + ToNode: Node(p.configuration.CentralNodeName), + FromNode: Node(p.node), Data: []string{d}, Method: REQHello, } @@ -189,7 +189,7 @@ func (s startup) subREQHello(p process) { // of the nodes we've received hello's from in the sayHelloNodes map, // which is the information we pass along to generate metrics. proc.procFunc = func(ctx context.Context) error { - sayHelloNodes := make(map[node]struct{}) + sayHelloNodes := make(map[Node]struct{}) promHelloNodes := promauto.NewGauge(prometheus.GaugeOpts{ Name: "hello_nodes_total", diff --git a/stew.go b/stew/stew.go similarity index 95% rename from stew.go rename to stew/stew.go index 1cff416..c4137c2 100644 --- a/stew.go +++ b/stew/stew.go @@ -14,6 +14,8 @@ import ( "github.com/gdamore/tcell/v2" "github.com/rivo/tview" + + "github.com/RaaLabs/steward" ) type Stew struct { @@ -125,7 +127,7 @@ func (p *pageMessage) start() error { // template, and we can add case statements for those fields below // that we do not wan't to check. func compareMsgAndMessage() error { - stewardMessage := Message{} + stewardMessage := steward.Message{} stewMsg := msg{} stewardRefVal := reflect.ValueOf(stewardMessage) @@ -206,16 +208,16 @@ func (p *pageMessage) drawMsgForm() error { value := `"bash","-c","..."` p.msgInputForm.AddInputField(fieldName, value, 30, nil, nil) case "Method": - var m Method + var m steward.Method ma := m.GetMethodsAvailable() values := []string{} - for k := range ma.methodhandlers { + for k := range ma.Methodhandlers { values = append(values, string(k)) } p.msgInputForm.AddDropDown(fieldName, values, 0, nil).SetItemPadding(1) case "ReplyMethod": - var m Method - rm := m.getReplyMethods() + var m steward.Method + rm := m.GetReplyMethods() values := []string{} for _, k := range rm { values = append(values, string(k)) @@ -270,10 +272,10 @@ func (p *pageMessage) drawMsgForm() error { formItems := []formItem{} // Get values values to be used for the "Method" dropdown. - var m Method + var m steward.Method ma := m.GetMethodsAvailable() values := []string{} - for k := range ma.methodhandlers { + for k := range ma.Methodhandlers { values = append(values, string(k)) } @@ -319,10 +321,10 @@ func (p *pageMessage) drawMsgForm() error { } // Get values values to be used for the "Method" dropdown. - var m Method + var m steward.Method ma := m.GetMethodsAvailable() values := []string{} - for k := range ma.methodhandlers { + for k := range ma.Methodhandlers { values = append(values, string(k)) } @@ -406,8 +408,8 @@ func (p *pageMessage) drawMsgForm() error { // TODO: Should also add a write directly to socket here. AddButton("generate to console", func() { // --- - opCmdStartProc := OpCmdStartProc{} - opCmdStopProc := OpCmdStopProc{} + opCmdStartProc := steward.OpCmdStartProc{} + opCmdStopProc := steward.OpCmdStopProc{} // --- // fh, err := os.Create("message.json") @@ -432,7 +434,7 @@ func (p *pageMessage) drawMsgForm() error { return } - m.ToNode = node(value) + m.ToNode = steward.Node(value) case "Data": // Split the comma separated string into a // and remove the start and end ampersand. @@ -457,9 +459,9 @@ func (p *pageMessage) drawMsgForm() error { m.Data = data case "Method": - m.Method = Method(value) + m.Method = steward.Method(value) case "ReplyMethod": - m.ReplyMethod = Method(value) + m.ReplyMethod = steward.Method(value) case "ACKTimeout": v, _ := strconv.Atoi(value) m.ACKTimeout = v @@ -497,13 +499,13 @@ func (p *pageMessage) drawMsgForm() error { fmt.Fprintf(p.logForm, "%v : error: missing startProc Method\n", time.Now().Format("Mon Jan _2 15:04:05 2006")) return } - opCmdStartProc.Method = Method(value) + opCmdStartProc.Method = steward.Method(value) case "startProc AllowedNodes": // Split the comma separated string into a // and remove the start and end ampersand. sp := strings.Split(value, ",") - var allowedNodes []node + var allowedNodes []steward.Node for _, v := range sp { // Check if format is correct, return if not. @@ -517,7 +519,7 @@ func (p *pageMessage) drawMsgForm() error { v = v[1:] v = strings.TrimSuffix(v, "\"") - allowedNodes = append(allowedNodes, node(v)) + allowedNodes = append(allowedNodes, steward.Node(v)) } opCmdStartProc.AllowedNodes = allowedNodes @@ -604,15 +606,15 @@ func getNodeNames(filePath string) ([]string, error) { // empty fields. type msg struct { // The node to send the message to - ToNode node `json:"toNode" yaml:"toNode"` + ToNode steward.Node `json:"toNode" yaml:"toNode"` // The actual data in the message Data []string `json:"data" yaml:"data"` // Method, what is this message doing, etc. CLI, syslog, etc. - Method Method `json:"method" yaml:"method"` + Method steward.Method `json:"method" yaml:"method"` // ReplyMethod, is the method to use for the reply message. // By default the reply method will be set to log to file, but // you can override it setting your own here. - ReplyMethod Method `json:"replyMethod" yaml:"replyMethod"` + ReplyMethod steward.Method `json:"replyMethod" yaml:"replyMethod"` // From what node the message originated ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"` // Resend retries diff --git a/subscriber_method_types.go b/subscriber_method_types.go index ee1ffbe..aa3b782 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -136,7 +136,7 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { // Command, Used to make a request to perform an action // Event, Used to communicate that an action has been performed. ma := MethodsAvailable{ - methodhandlers: map[Method]methodHandler{ + Methodhandlers: map[Method]methodHandler{ REQInitial: methodREQInitial{ commandOrEvent: CommandACK, }, @@ -186,7 +186,7 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { } // Reply methods -func (m Method) getReplyMethods() []Method { +func (m Method) GetReplyMethods() []Method { rm := []Method{REQToConsole, REQToFile, REQToFileAppend} return rm } @@ -196,7 +196,7 @@ func (m Method) getReplyMethods() []Method { // as input argument. func (m Method) getHandler(method Method) methodHandler { ma := m.GetMethodsAvailable() - mh := ma.methodhandlers[method] + mh := ma.Methodhandlers[method] return mh } @@ -223,14 +223,14 @@ func (m methodREQInitial) handler(proc process, message Message, node string) ([ // ---- type MethodsAvailable struct { - methodhandlers map[Method]methodHandler + Methodhandlers map[Method]methodHandler } // Check if exists will check if the Method is defined. If true the bool // value will be set to true, and the methodHandler function for that type // will be returned. func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) { - mFunc, ok := ma.methodhandlers[m] + mFunc, ok := ma.Methodhandlers[m] if ok { // fmt.Printf("******THE TOPIC EXISTS: %v******\n", m) return mFunc, true @@ -261,11 +261,11 @@ func (m methodREQOpCommand) getKind() CommandOrEvent { type OpCmdStartProc struct { Method Method `json:"method"` - AllowedNodes []node `json:"allowedNodes"` + AllowedNodes []Node `json:"allowedNodes"` } type OpCmdStopProc struct { - RecevingNode node `json:"receivingNode"` + RecevingNode Node `json:"receivingNode"` Method Method `json:"method"` Kind processKind `json:"kind"` ID int `json:"id"`