From dc2352cab7a487270501c4d01da26df5f73a6421 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 3 Feb 2021 22:08:28 +0100 Subject: [PATCH] refactoring for reading input from file...and more --- getmessagefromfile.go | 38 +++++++++++++----- go.mod | 2 + go.sum | 5 +++ orig.json | 30 ++++++++++++++ orig.yaml | 23 +++++++++++ publisher.go | 91 ++++++++++++++++++++----------------------- subscriber.go | 2 +- 7 files changed, 131 insertions(+), 60 deletions(-) create mode 100644 orig.json create mode 100644 orig.yaml diff --git a/getmessagefromfile.go b/getmessagefromfile.go index 3dd6695..dd06ebf 100644 --- a/getmessagefromfile.go +++ b/getmessagefromfile.go @@ -2,6 +2,7 @@ package steward import ( "bufio" + "encoding/json" "fmt" "io" "log" @@ -13,7 +14,7 @@ import ( // getMessagesFromFile will start a file watcher for the given directory // and filename. It will take a channel of []byte as input, and it is // in this channel the content of a file that has changed is returned. -func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []byte) { +func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []jsonFromFile) { fileUpdated := make(chan bool) go fileWatcherStart(directoryToCheck, fileUpdated) @@ -26,13 +27,36 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh log.Printf("error: reading file: %v", err) } - fileContentCh <- b - fmt.Printf("File content read: %s\n", b) + // unmarshal the JSON into a struct + js, err := jsonFromFileData(b) + if err != nil { + log.Printf("%v\n", err) + } + + // Send the data back to be consumed + fileContentCh <- js } } } +type jsonFromFile struct { + Subject `json:"subject"` + Message `json:"message"` +} + +func jsonFromFileData(b []byte) ([]jsonFromFile, error) { + JS := []jsonFromFile{} + + //err := yaml.Unmarshal(b, &JS) + err := json.Unmarshal(b, &JS) + if err != nil { + return nil, fmt.Errorf("error: json unmarshal of file failed: %v", err) + } + + return JS, nil +} + // readTruncateMessageFile, will read all the messages in the given // file, and truncate the file after read. // A []byte will be returned with the content read. @@ -53,20 +77,14 @@ func readTruncateMessageFile(fileName string) ([]byte, error) { lines = append(lines, scanner.Bytes()...) } - fmt.Printf("*** DEBUG : %s\n", lines) - - fmt.Printf("read: %s\n", lines) - // empty the file after all is read - ret, err := f.Seek(0, io.SeekStart) + _, err = f.Seek(0, io.SeekStart) if err != nil { return nil, fmt.Errorf("f.Seek failed: %v", err) } - fmt.Printf("** ret=%v\n", ret) err = f.Truncate(0) if err != nil { - fmt.Printf("******* %#v\n", err) return nil, fmt.Errorf("f.Truncate failed: %v", err) } diff --git a/go.mod b/go.mod index d77e095..154f88b 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,10 @@ go 1.15 require ( github.com/fsnotify/fsnotify v1.4.9 + github.com/ghodss/yaml v1.0.0 github.com/golang/protobuf v1.4.3 // indirect github.com/nats-io/nats-server/v2 v2.1.9 // indirect github.com/nats-io/nats.go v1.10.0 google.golang.org/protobuf v1.25.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 519afdb..9b01a8e 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -87,5 +89,8 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/orig.json b/orig.json new file mode 100644 index 0000000..6509e2b --- /dev/null +++ b/orig.json @@ -0,0 +1,30 @@ +[ + { + "subject": + { + "node":"ship1", + "messageType":"command", + "method":"shellcommand", + "domain":"shell" + }, + "message": + { + "data": ["bash","-c","uname -a"], + "messageType":"eventReturnAck" + } + }, + { + "subject": + { + "node":"ship2", + "messageType":"command", + "method":"shellcommand", + "domain":"shell" + }, + "message": + { + "data": ["bash","-c","uname -a"], + "messageType":"eventReturnAck" + } + } + ] \ No newline at end of file diff --git a/orig.yaml b/orig.yaml new file mode 100644 index 0000000..27e4cd1 --- /dev/null +++ b/orig.yaml @@ -0,0 +1,23 @@ +--- +- subject: + node: ship1 + messageType: command + method: shellcommand + domain: shell + message: + data: + - bash + - "-c" + - uname -a + messageType: eventReturnAck +- subject: + node: ship2 + messageType: command + method: shellcommand + domain: shell + message: + data: + - bash + - "-c" + - uname -a + messageType: eventReturnAck \ No newline at end of file diff --git a/publisher.go b/publisher.go index 3db5aa1..860f84d 100644 --- a/publisher.go +++ b/publisher.go @@ -12,9 +12,7 @@ import ( "github.com/nats-io/nats.go" ) -var mu sync.Mutex - -type messageType int +type MessageType string // TODO: Figure it makes sense to have these types at all. // It might make more sense to implement these as two @@ -25,13 +23,13 @@ const ( // delivered back in the reply ack message. // The message should contain the unique ID of the // command. - commandReturnOutput messageType = iota + CommandReturnOutput MessageType = "commandReturnOutput" // shellCommand, wait for and return the output // of the command in the ACK message. This means // that the command should be executed immediately // and that we should get the confirmation that it // was successful or not. - eventReturnAck messageType = iota + EventReturnAck MessageType = "eventReturnAck" // eventCommand, just wait for the ACK that the // message is received. What action happens on the // receiving side is up to the received to decide. @@ -39,13 +37,13 @@ const ( type Message struct { // The Unique ID of the message - ID int + ID int `json:"id"` // The actual data in the message // TODO: Change this to a slice instead...or maybe use an // interface type here to handle several data types ? - Data []string + Data []string `json:"data"` // The type of the message being sent - MessageType messageType + MessageType MessageType `json:"messageType"` } // server is the structure that will hold the state about spawned @@ -57,6 +55,7 @@ type server struct { // The last processID created lastProcessID int nodeName string + mu sync.Mutex } // newServer will prepare and return a server type @@ -72,6 +71,9 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { processes: make(map[subjectName]process), } + // Start the error handler + // TODO: For now it will just print the error messages to the + // console. go func() { for { @@ -93,29 +95,23 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { func (s *server) PublisherStart() { // start the checking of files for input messages - fileReadCh := make((chan []byte)) + fileReadCh := make((chan []jsonFromFile)) go getMessagesFromFile("./", "inmsg.txt", fileReadCh) // TODO: For now we just print content of the files read. - // Replace this whit a broker function that will know how + // Replace this with a broker function that will know how // send it on to the correct publisher. go func() { - for b := range fileReadCh { + for v := range fileReadCh { // Check if there are new content read from file input - fmt.Printf("received: %s\n", b) + fmt.Printf("received: %#v\n", v) } }() // Prepare and start a single process { - sub := subject{ - node: "btship1", - messageType: "command", - method: "shellcommand", - domain: "shell", - messageCh: make(chan Message), - } + sub := newSubject("btship1", "command", "shellcommand", "shell") proc := s.processPrepareNew(sub) // fmt.Printf("*** %#v\n", proc) go s.processSpawn(proc) @@ -123,13 +119,7 @@ func (s *server) PublisherStart() { // Prepare and start a single process { - sub := subject{ - node: "btship2", - messageType: "command", - method: "shellcommand", - domain: "shell", - messageCh: make(chan Message), - } + sub := newSubject("btship2", "command", "shellcommand", "shell") proc := s.processPrepareNew(sub) // fmt.Printf("*** %#v\n", proc) go s.processSpawn(proc) @@ -140,7 +130,7 @@ func (s *server) PublisherStart() { for { m := Message{ Data: []string{"bash", "-c", "uname -a"}, - MessageType: eventReturnAck, + MessageType: EventReturnAck, } subjName := subjectName("btship1.command.shellcommand.shell") _, ok := s.processes[subjName] @@ -179,26 +169,39 @@ type node string // subject contains the representation of a subject to be used with one // specific process -type subject struct { +type Subject struct { // node, the name of the node - node string + Node string `json:"node"` // messageType, command/event - messageType string + MessageType string `json:"messageType"` // method, what is this message doing, etc. shellcommand, syslog, etc. - method string + Method string `json:"method"` // domain is used to differentiate services. Like there can be more // logging services, but rarely more logging services for the same // thing. Domain is here used to differentiate the the services and // tell with one word what it is for. - domain string + Domain string `json:"domain"` // messageCh is the channel for receiving new content to be sent messageCh chan Message } +// newSubject will return a new variable of the type subject, and insert +// all the values given as arguments. It will also create the channel +// to receive new messages on the specific subject. +func newSubject(node string, messageType string, method string, domain string) Subject { + return Subject{ + Node: node, + MessageType: messageType, + Method: method, + Domain: domain, + messageCh: make(chan Message), + } +} + type subjectName string -func (s subject) name() subjectName { - return subjectName(fmt.Sprintf("%s.%s.%s.%s", s.node, s.messageType, s.method, s.domain)) +func (s Subject) name() subjectName { + return subjectName(fmt.Sprintf("%s.%s.%s.%s", s.Node, s.MessageType, s.Method, s.Domain)) } // process are represent the communication to one individual host @@ -207,7 +210,7 @@ type process struct { // the subject used for the specific process. One process // can contain only one sender on a message bus, hence // also one subject - subject subject + subject Subject // Put a node here to be able know the node a process is at. // NB: Might not be needed later on. node node @@ -223,13 +226,13 @@ type process struct { // prepareNewProcess will set the the provided values and the default // values for a process. -func (s *server) processPrepareNew(subject subject) process { +func (s *server) processPrepareNew(subject Subject) process { // create the initial configuration for a sessions communicating with 1 host process. s.lastProcessID++ proc := process{ messageID: 0, subject: subject, - node: node(subject.node), + node: node(subject.Node), processID: s.lastProcessID, errorCh: make(chan string), //messageCh: make(chan Message), @@ -242,12 +245,12 @@ func (s *server) processPrepareNew(subject subject) process { // the next available ID, and also add the process to the processes // map. func (s *server) processSpawn(proc process) { - mu.Lock() + s.mu.Lock() // We use the full name of the subject to identify a unique // process. We can do that since a process can only handle // one message queue. s.processes[proc.subject.name()] = proc - mu.Unlock() + s.mu.Unlock() // Loop creating one new message every second to simulate getting new // messages to deliver. @@ -258,7 +261,6 @@ func (s *server) processSpawn(proc process) { // is listened on in the for loop below could be used to receive the // messages from the message-pickup-process. for { - // m := getMessageToDeliver() // Wait and read the next message on the message channel m := <-proc.subject.messageCh m.ID = s.processes[proc.subject.name()].messageID @@ -277,15 +279,6 @@ func (s *server) processSpawn(proc process) { } } -// get MessageToDeliver will pick up the next message to be created. -// TODO: read this from local file or rest or....? -func getMessageToDeliver() Message { - return Message{ - Data: []string{"bash", "-c", "uname -a"}, - MessageType: eventReturnAck, - } -} - func messageDeliver(proc process, message Message, natsConn *nats.Conn) { for { dataPayload, err := gobEncodePayload(message) diff --git a/subscriber.go b/subscriber.go index 6400084..d059f0c 100644 --- a/subscriber.go +++ b/subscriber.go @@ -33,7 +33,7 @@ func (s *server) RunSubscriber() { msg := <-reqMsgCh fmt.Printf("%v\n", msg) switch msg.MessageType { - case eventReturnAck: + case EventReturnAck: // Since the command to execute is at the first position in the // slice we need to slice it out. The arguments are at the // remaining positions.