diff --git a/cmd/main.go b/cmd/main.go index 1d33612..1ba4867 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,8 +3,11 @@ package main import ( "flag" "log" + "net/http" "os" + _ "net/http/pprof" + "github.com/RaaLabs/steward" ) @@ -13,8 +16,18 @@ func main() { brokerAddress := flag.String("brokerAddress", "0", "the address of the message broker") modePublisher := flag.Bool("modePublisher", false, "set to true if it should be able to publish") modeSubscriber := flag.Bool("modeSubscriber", false, "set to true if it should be able to subscribe") + profilingPort := flag.String("profilingPort", "", "The number of the profiling port") flag.Parse() + if *profilingPort != "" { + // TODO REMOVE: Added for profiling + + go func() { + http.ListenAndServe("localhost:"+*profilingPort, nil) + }() + + } + s, err := steward.NewServer(*brokerAddress, *nodeName) if err != nil { log.Printf("error: failed to connect to broker: %v\n", err) diff --git a/example-inmessage/orig-ship1.json b/example-inmessage/orig-ship1.json index fba61f2..8ba7561 100644 --- a/example-inmessage/orig-ship1.json +++ b/example-inmessage/orig-ship1.json @@ -9,7 +9,7 @@ }, "message": { - "data": ["bash","-c","uname -a"], + "data": ["bash","-c","ls -l"], "messageType":"Command" } } diff --git a/getmessagefromfile.go b/getmessagefromfile.go index b040bda..45e9ba6 100644 --- a/getmessagefromfile.go +++ b/getmessagefromfile.go @@ -18,26 +18,27 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh fileUpdated := make(chan bool) go fileWatcherStart(directoryToCheck, fileUpdated) - for { - select { - case <-fileUpdated: - //load file, read it's content - b, err := readTruncateMessageFile(fileName) - if err != nil { - log.Printf("error: reading file: %v", err) - } + for range fileUpdated { - // unmarshal the JSON into a struct - js, err := jsonFromFileData(b) - if err != nil { - log.Printf("%v\n", err) - } - - // Send the data back to be consumed - fileContentCh <- js + //load file, read it's content + b, err := readTruncateMessageFile(fileName) + if err != nil { + log.Printf("error: reading file: %v", err) } - } + // unmarshal the JSON into a struct + js, err := jsonFromFileData(b) + if err != nil { + log.Printf("%v\n", err) + } + + for i, _ := range js { + fmt.Printf("messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageType, js[i].Subject.MessageType) + } + + // Send the data back to be consumed + fileContentCh <- js + } } type jsonFromFile struct { diff --git a/publisher.go b/publisher.go index 1198814..a58a0a2 100644 --- a/publisher.go +++ b/publisher.go @@ -76,6 +76,11 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { newMessagesCh: make(chan []jsonFromFile), } + return s, nil + +} + +func (s *server) PublisherStart() { // Start the error handler // TODO: For now it will just print the error messages to the // console. @@ -94,11 +99,6 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { } }() - return s, nil - -} - -func (s *server) PublisherStart() { // start the checking of files for input messages go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) @@ -182,7 +182,7 @@ type Subject struct { // node, the name of the node Node string `json:"node" yaml:"node"` // messageType, command/event - MessageType string `json:"messageType" yaml:"messageType"` + MessageType MessageType `json:"messageType" yaml:"messageType"` // method, what is this message doing, etc. shellcommand, syslog, etc. Method string `json:"method" yaml:"method"` // domain is used to differentiate services. Like there can be more @@ -197,7 +197,7 @@ type Subject struct { // newSubject will return a new variable of the type subject, and insert // all the values given as arguments. It will also create the channel // to receive new messages on the specific subject. -func newSubject(node string, messageType string, method string, domain string) Subject { +func newSubject(node string, messageType MessageType, method string, domain string) Subject { return Subject{ Node: node, MessageType: messageType, diff --git a/subscriber.go b/subscriber.go index 716691c..650a348 100644 --- a/subscriber.go +++ b/subscriber.go @@ -25,14 +25,14 @@ func (s *server) RunSubscriber() { subject := fmt.Sprintf("%s.%s.%s.%s", s.nodeName, "command", "shellcommand", "shell") _, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.nodeName)) if err != nil { - fmt.Printf("error: Subscribe failed: %v\n", err) + log.Printf("error: Subscribe failed: %v\n", err) } // Do some further processing of the actual data we received in the // subscriber callback function. for { msg := <-reqMsgCh - fmt.Printf("%v\n", msg) + //fmt.Printf("%v\n", msg) switch msg.MessageType { case "Command": // Since the command to execute is at the first position in the @@ -44,7 +44,7 @@ func (s *server) RunSubscriber() { cmd.Stdout = os.Stdout err := cmd.Start() if err != nil { - fmt.Printf("error: execution of command failed: %v\n", err) + log.Printf("error: execution of command failed: %v\n", err) } case "Event": // Since the command to execute is at the first position in the @@ -56,7 +56,7 @@ func (s *server) RunSubscriber() { cmd.Stdout = os.Stdout err := cmd.Start() if err != nil { - fmt.Printf("error: execution of command failed: %v\n", err) + log.Printf("error: execution of command failed: %v\n", err) } default: log.Printf("info: did not find that specific type of command: %#v\n", msg.MessageType) @@ -78,7 +78,7 @@ func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message, node string) f gobDec := gob.NewDecoder(buf) err := gobDec.Decode(&message) if err != nil { - fmt.Printf("error: gob decoding failed: %v\n", err) + log.Printf("error: gob decoding failed: %v\n", err) } // Put the data recived on the channel for further processing