1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-05 20:09:16 +00:00

testing with message types

This commit is contained in:
postmannen 2021-02-05 07:25:12 +01:00
parent 98d5c4b3da
commit 7d5b2bccb9
5 changed files with 44 additions and 30 deletions

View file

@ -3,8 +3,11 @@ package main
import ( import (
"flag" "flag"
"log" "log"
"net/http"
"os" "os"
_ "net/http/pprof"
"github.com/RaaLabs/steward" "github.com/RaaLabs/steward"
) )
@ -13,8 +16,18 @@ func main() {
brokerAddress := flag.String("brokerAddress", "0", "the address of the message broker") brokerAddress := flag.String("brokerAddress", "0", "the address of the message broker")
modePublisher := flag.Bool("modePublisher", false, "set to true if it should be able to publish") modePublisher := flag.Bool("modePublisher", false, "set to true if it should be able to publish")
modeSubscriber := flag.Bool("modeSubscriber", false, "set to true if it should be able to subscribe") modeSubscriber := flag.Bool("modeSubscriber", false, "set to true if it should be able to subscribe")
profilingPort := flag.String("profilingPort", "", "The number of the profiling port")
flag.Parse() flag.Parse()
if *profilingPort != "" {
// TODO REMOVE: Added for profiling
go func() {
http.ListenAndServe("localhost:"+*profilingPort, nil)
}()
}
s, err := steward.NewServer(*brokerAddress, *nodeName) s, err := steward.NewServer(*brokerAddress, *nodeName)
if err != nil { if err != nil {
log.Printf("error: failed to connect to broker: %v\n", err) log.Printf("error: failed to connect to broker: %v\n", err)

View file

@ -9,7 +9,7 @@
}, },
"message": "message":
{ {
"data": ["bash","-c","uname -a"], "data": ["bash","-c","ls -l"],
"messageType":"Command" "messageType":"Command"
} }
} }

View file

@ -18,26 +18,27 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh
fileUpdated := make(chan bool) fileUpdated := make(chan bool)
go fileWatcherStart(directoryToCheck, fileUpdated) go fileWatcherStart(directoryToCheck, fileUpdated)
for { for range fileUpdated {
select {
case <-fileUpdated:
//load file, read it's content
b, err := readTruncateMessageFile(fileName)
if err != nil {
log.Printf("error: reading file: %v", err)
}
// unmarshal the JSON into a struct //load file, read it's content
js, err := jsonFromFileData(b) b, err := readTruncateMessageFile(fileName)
if err != nil { if err != nil {
log.Printf("%v\n", err) log.Printf("error: reading file: %v", err)
}
// Send the data back to be consumed
fileContentCh <- js
} }
}
// unmarshal the JSON into a struct
js, err := jsonFromFileData(b)
if err != nil {
log.Printf("%v\n", err)
}
for i, _ := range js {
fmt.Printf("messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageType, js[i].Subject.MessageType)
}
// Send the data back to be consumed
fileContentCh <- js
}
} }
type jsonFromFile struct { type jsonFromFile struct {

View file

@ -76,6 +76,11 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
newMessagesCh: make(chan []jsonFromFile), newMessagesCh: make(chan []jsonFromFile),
} }
return s, nil
}
func (s *server) PublisherStart() {
// Start the error handler // Start the error handler
// TODO: For now it will just print the error messages to the // TODO: For now it will just print the error messages to the
// console. // console.
@ -94,11 +99,6 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
} }
}() }()
return s, nil
}
func (s *server) PublisherStart() {
// start the checking of files for input messages // start the checking of files for input messages
go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh)
@ -182,7 +182,7 @@ type Subject struct {
// node, the name of the node // node, the name of the node
Node string `json:"node" yaml:"node"` Node string `json:"node" yaml:"node"`
// messageType, command/event // messageType, command/event
MessageType string `json:"messageType" yaml:"messageType"` MessageType MessageType `json:"messageType" yaml:"messageType"`
// method, what is this message doing, etc. shellcommand, syslog, etc. // method, what is this message doing, etc. shellcommand, syslog, etc.
Method string `json:"method" yaml:"method"` Method string `json:"method" yaml:"method"`
// domain is used to differentiate services. Like there can be more // domain is used to differentiate services. Like there can be more
@ -197,7 +197,7 @@ type Subject struct {
// newSubject will return a new variable of the type subject, and insert // newSubject will return a new variable of the type subject, and insert
// all the values given as arguments. It will also create the channel // all the values given as arguments. It will also create the channel
// to receive new messages on the specific subject. // to receive new messages on the specific subject.
func newSubject(node string, messageType string, method string, domain string) Subject { func newSubject(node string, messageType MessageType, method string, domain string) Subject {
return Subject{ return Subject{
Node: node, Node: node,
MessageType: messageType, MessageType: messageType,

View file

@ -25,14 +25,14 @@ func (s *server) RunSubscriber() {
subject := fmt.Sprintf("%s.%s.%s.%s", s.nodeName, "command", "shellcommand", "shell") subject := fmt.Sprintf("%s.%s.%s.%s", s.nodeName, "command", "shellcommand", "shell")
_, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.nodeName)) _, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.nodeName))
if err != nil { if err != nil {
fmt.Printf("error: Subscribe failed: %v\n", err) log.Printf("error: Subscribe failed: %v\n", err)
} }
// Do some further processing of the actual data we received in the // Do some further processing of the actual data we received in the
// subscriber callback function. // subscriber callback function.
for { for {
msg := <-reqMsgCh msg := <-reqMsgCh
fmt.Printf("%v\n", msg) //fmt.Printf("%v\n", msg)
switch msg.MessageType { switch msg.MessageType {
case "Command": case "Command":
// Since the command to execute is at the first position in the // Since the command to execute is at the first position in the
@ -44,7 +44,7 @@ func (s *server) RunSubscriber() {
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
err := cmd.Start() err := cmd.Start()
if err != nil { if err != nil {
fmt.Printf("error: execution of command failed: %v\n", err) log.Printf("error: execution of command failed: %v\n", err)
} }
case "Event": case "Event":
// Since the command to execute is at the first position in the // Since the command to execute is at the first position in the
@ -56,7 +56,7 @@ func (s *server) RunSubscriber() {
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
err := cmd.Start() err := cmd.Start()
if err != nil { if err != nil {
fmt.Printf("error: execution of command failed: %v\n", err) log.Printf("error: execution of command failed: %v\n", err)
} }
default: default:
log.Printf("info: did not find that specific type of command: %#v\n", msg.MessageType) log.Printf("info: did not find that specific type of command: %#v\n", msg.MessageType)
@ -78,7 +78,7 @@ func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message, node string) f
gobDec := gob.NewDecoder(buf) gobDec := gob.NewDecoder(buf)
err := gobDec.Decode(&message) err := gobDec.Decode(&message)
if err != nil { if err != nil {
fmt.Printf("error: gob decoding failed: %v\n", err) log.Printf("error: gob decoding failed: %v\n", err)
} }
// Put the data recived on the channel for further processing // Put the data recived on the channel for further processing