diff --git a/message_and_subject.go b/message_and_subject.go index 99b56c0..e124849 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -44,7 +44,7 @@ type Message struct { // fields. IsReply bool `json:"isReply" yaml:"isReply"` // From what node the message originated - FromNode Node + FromNode Node `json:"fromNode" yaml:"fromNode"` // ACKTimeout for waiting for an ack message ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"` // Resend retries diff --git a/read_socket_or_tcp_listener.go b/message_readers.go similarity index 72% rename from read_socket_or_tcp_listener.go rename to message_readers.go index 3c10434..9a00910 100644 --- a/read_socket_or_tcp_listener.go +++ b/message_readers.go @@ -5,12 +5,123 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "log" "net" "net/http" "os" + "path/filepath" ) +// readStartupFolder will check the /startup folder when Steward +// starts for messages to process. +// The purpose of the startup folder is that we can define messages on a +// node that will be run when Steward starts up. +// Messages defined in the startup folder should have the toNode set to +// self, and the from node set to where we want the answer sent. The reason +// for this is that all replies normally pick up the host from the original +// first message, but here we inject it on an end node so we need to specify +// the fromNode to get the reply back to the node we want. +func (s *server) readStartupFolder() { + + // Get the names of all the files in the startup folder. + const startupFolder = "startup" + filePaths, err := s.getFilePaths(startupFolder) + if err != nil { + er := fmt.Errorf("error: readStartupFolder: unable to get filenames: %v", err) + s.errorKernel.errSend(s.processInitial, Message{}, er) + return + } + + for _, filePath := range filePaths { + + // Read the content of each file. + readBytes, err := func(filePath string) ([]byte, error) { + fh, err := os.Open(filePath) + if err != nil { + er := fmt.Errorf("error: failed to open file in startup folder: %v", err) + return nil, er + } + defer fh.Close() + + b, err := io.ReadAll(fh) + if err != nil { + er := fmt.Errorf("error: failed to read file in startup folder: %v", err) + return nil, er + } + + return b, nil + }(filePath) + + if err != nil { + s.errorKernel.errSend(s.processInitial, Message{}, err) + continue + } + + readBytes = bytes.Trim(readBytes, "\x00") + + // unmarshal the JSON into a struct + sams, err := s.convertBytesToSAMs(readBytes) + if err != nil { + er := fmt.Errorf("error: startup folder: malformed json read: %v", err) + s.errorKernel.errSend(s.processInitial, Message{}, er) + continue + } + + // Check if fromNode field is specified, and remove the message if blank. + for i := range sams { + if sams[i].Message.FromNode == "" { + sams = append(sams[:i], sams[i+1:]...) + log.Printf(" error: missing from field in startup message\n") + } + + // Bounds check. + if i == len(sams)-1 { + break + } + } + + // Send the SAM struct to be picked up by the ring buffer. + s.ringBufferBulkInCh <- sams + + } +} + +// getFilePaths will get the names of all the messages in +// the folder specified from current working directory. +func (s *server) getFilePaths(dirName string) ([]string, error) { + dirPath, err := os.Getwd() + if err != nil { + return nil, fmt.Errorf("error: startup folder: unable to get the working directory %v: %v", dirPath, err) + } + + dirPath = filepath.Join(dirPath, dirName) + + // Check if the startup folder exist. + if _, err := os.Stat(dirPath); os.IsNotExist(err) { + err := os.MkdirAll(dirPath, 0700) + if err != nil { + er := fmt.Errorf("error: failed to create startup folder: %v", err) + return nil, er + } + } + + fInfo, err := ioutil.ReadDir(dirPath) + if err != nil { + er := fmt.Errorf("error: failed to get filenames in startup folder: %v", err) + return nil, er + } + + filePaths := []string{} + + for _, v := range fInfo { + realpath := filepath.Join(dirPath, v.Name()) + filePaths = append(filePaths, realpath) + } + + return filePaths, nil +} + // readSocket will read the .sock file specified. // It will take a channel of []byte as input, and it is in this // channel the content of a file that has changed is returned. diff --git a/server.go b/server.go index 839700d..1e5fa67 100644 --- a/server.go +++ b/server.go @@ -274,6 +274,9 @@ func (s *server) Start() { // so we can cancel this context last, and not use the server. s.routeMessagesToProcess("./incomingBuffer.db") + // Check and enable read the messages specified in the startup folder. + s.readStartupFolder() + } // Will stop all processes started during startup. diff --git a/tui.go b/tui.go index bd9a012..9eaf791 100644 --- a/tui.go +++ b/tui.go @@ -456,7 +456,7 @@ func (t *tui) messageSlide(app *tview.Application) tview.Primitive { // Add a dropdown menu to select message files to use. - msgsValues := getMessageNames(p.logForm) + msgsValues := t.getMessageNames(p.logForm) msgDropdownFunc := func(msgFileName string, index int) { filePath := filepath.Join("messages", msgFileName) @@ -500,7 +500,7 @@ func (t *tui) messageSlide(app *tview.Application) tview.Primitive { p.selectMessage.AddFormItem(messageDropdown) p.inputForm.AddButton("update message dropdown menu", func() { - messageMessageValues := getMessageNames(p.logForm) + messageMessageValues := t.getMessageNames(p.logForm) messageDropdown.SetLabel("message").SetOptions(messageMessageValues, msgDropdownFunc) }) @@ -664,7 +664,7 @@ func (t *tui) messageSlide(app *tview.Application) tview.Primitive { fmt.Fprintf(p.logForm, "info: succesfully wrote message to file: %v\n", file) // update the select message dropdown - messageMessageValues := getMessageNames(p.logForm) + messageMessageValues := t.getMessageNames(p.logForm) messageDropdown.SetLabel("message").SetOptions(messageMessageValues, msgDropdownFunc) // p.inputForm.Clear(false) @@ -726,7 +726,7 @@ func (t *tui) console(app *tview.Application) tview.Primitive { nodesDropdown.SetLabel("nodes").SetOptions(nodesList, nil) p.selectForm.AddFormItem(nodesDropdown) - msgsValues := getMessageNames(p.outputForm) + msgsValues := t.getMessageNames(p.outputForm) messageDropdown := tview.NewDropDown() messageDropdown.SetLabelColor(tcell.ColorIndianRed) @@ -741,7 +741,7 @@ func (t *tui) console(app *tview.Application) tview.Primitive { } nodesDropdown.SetLabel("nodes").SetOptions(nodesList, nil) - msgsValues := getMessageNames(p.outputForm) + msgsValues := t.getMessageNames(p.outputForm) messageDropdown.SetLabel("message").SetOptions(msgsValues, nil) }) @@ -758,7 +758,7 @@ func (t *tui) console(app *tview.Application) tview.Primitive { } nodesDropdown.SetLabel("nodes").SetOptions(nodesList, nil) - messageValues := getMessageNames(p.outputForm) + messageValues := t.getMessageNames(p.outputForm) messageDropdown.SetLabel("message").SetOptions(messageValues, nil) }) @@ -839,7 +839,7 @@ func (t *tui) console(app *tview.Application) tview.Primitive { // getMessageNames will get the names of all the messages in // the messages folder. -func getMessageNames(outputForm *tview.TextView) []string { +func (t *tui) getMessageNames(outputForm *tview.TextView) []string { // Create messages dropdown field. fInfo, err := ioutil.ReadDir("messages") if err != nil {