mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
added logic for startup folder
This commit is contained in:
parent
c1e11f0709
commit
88672f1e35
4 changed files with 122 additions and 8 deletions
|
@ -44,7 +44,7 @@ type Message struct {
|
|||
// fields.
|
||||
IsReply bool `json:"isReply" yaml:"isReply"`
|
||||
// From what node the message originated
|
||||
FromNode Node
|
||||
FromNode Node `json:"fromNode" yaml:"fromNode"`
|
||||
// ACKTimeout for waiting for an ack message
|
||||
ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"`
|
||||
// Resend retries
|
||||
|
|
|
@ -5,12 +5,123 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// readStartupFolder will check the <workdir>/startup folder when Steward
|
||||
// starts for messages to process.
|
||||
// The purpose of the startup folder is that we can define messages on a
|
||||
// node that will be run when Steward starts up.
|
||||
// Messages defined in the startup folder should have the toNode set to
|
||||
// self, and the from node set to where we want the answer sent. The reason
|
||||
// for this is that all replies normally pick up the host from the original
|
||||
// first message, but here we inject it on an end node so we need to specify
|
||||
// the fromNode to get the reply back to the node we want.
|
||||
func (s *server) readStartupFolder() {
|
||||
|
||||
// Get the names of all the files in the startup folder.
|
||||
const startupFolder = "startup"
|
||||
filePaths, err := s.getFilePaths(startupFolder)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: readStartupFolder: unable to get filenames: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
return
|
||||
}
|
||||
|
||||
for _, filePath := range filePaths {
|
||||
|
||||
// Read the content of each file.
|
||||
readBytes, err := func(filePath string) ([]byte, error) {
|
||||
fh, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to open file in startup folder: %v", err)
|
||||
return nil, er
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
b, err := io.ReadAll(fh)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to read file in startup folder: %v", err)
|
||||
return nil, er
|
||||
}
|
||||
|
||||
return b, nil
|
||||
}(filePath)
|
||||
|
||||
if err != nil {
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, err)
|
||||
continue
|
||||
}
|
||||
|
||||
readBytes = bytes.Trim(readBytes, "\x00")
|
||||
|
||||
// unmarshal the JSON into a struct
|
||||
sams, err := s.convertBytesToSAMs(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: startup folder: malformed json read: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if fromNode field is specified, and remove the message if blank.
|
||||
for i := range sams {
|
||||
if sams[i].Message.FromNode == "" {
|
||||
sams = append(sams[:i], sams[i+1:]...)
|
||||
log.Printf(" error: missing from field in startup message\n")
|
||||
}
|
||||
|
||||
// Bounds check.
|
||||
if i == len(sams)-1 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Send the SAM struct to be picked up by the ring buffer.
|
||||
s.ringBufferBulkInCh <- sams
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// getFilePaths will get the names of all the messages in
|
||||
// the folder specified from current working directory.
|
||||
func (s *server) getFilePaths(dirName string) ([]string, error) {
|
||||
dirPath, err := os.Getwd()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error: startup folder: unable to get the working directory %v: %v", dirPath, err)
|
||||
}
|
||||
|
||||
dirPath = filepath.Join(dirPath, dirName)
|
||||
|
||||
// Check if the startup folder exist.
|
||||
if _, err := os.Stat(dirPath); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(dirPath, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to create startup folder: %v", err)
|
||||
return nil, er
|
||||
}
|
||||
}
|
||||
|
||||
fInfo, err := ioutil.ReadDir(dirPath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to get filenames in startup folder: %v", err)
|
||||
return nil, er
|
||||
}
|
||||
|
||||
filePaths := []string{}
|
||||
|
||||
for _, v := range fInfo {
|
||||
realpath := filepath.Join(dirPath, v.Name())
|
||||
filePaths = append(filePaths, realpath)
|
||||
}
|
||||
|
||||
return filePaths, nil
|
||||
}
|
||||
|
||||
// readSocket will read the .sock file specified.
|
||||
// It will take a channel of []byte as input, and it is in this
|
||||
// channel the content of a file that has changed is returned.
|
|
@ -274,6 +274,9 @@ func (s *server) Start() {
|
|||
// so we can cancel this context last, and not use the server.
|
||||
s.routeMessagesToProcess("./incomingBuffer.db")
|
||||
|
||||
// Check and enable read the messages specified in the startup folder.
|
||||
s.readStartupFolder()
|
||||
|
||||
}
|
||||
|
||||
// Will stop all processes started during startup.
|
||||
|
|
14
tui.go
14
tui.go
|
@ -456,7 +456,7 @@ func (t *tui) messageSlide(app *tview.Application) tview.Primitive {
|
|||
|
||||
// Add a dropdown menu to select message files to use.
|
||||
|
||||
msgsValues := getMessageNames(p.logForm)
|
||||
msgsValues := t.getMessageNames(p.logForm)
|
||||
|
||||
msgDropdownFunc := func(msgFileName string, index int) {
|
||||
filePath := filepath.Join("messages", msgFileName)
|
||||
|
@ -500,7 +500,7 @@ func (t *tui) messageSlide(app *tview.Application) tview.Primitive {
|
|||
p.selectMessage.AddFormItem(messageDropdown)
|
||||
|
||||
p.inputForm.AddButton("update message dropdown menu", func() {
|
||||
messageMessageValues := getMessageNames(p.logForm)
|
||||
messageMessageValues := t.getMessageNames(p.logForm)
|
||||
messageDropdown.SetLabel("message").SetOptions(messageMessageValues, msgDropdownFunc)
|
||||
})
|
||||
|
||||
|
@ -664,7 +664,7 @@ func (t *tui) messageSlide(app *tview.Application) tview.Primitive {
|
|||
fmt.Fprintf(p.logForm, "info: succesfully wrote message to file: %v\n", file)
|
||||
|
||||
// update the select message dropdown
|
||||
messageMessageValues := getMessageNames(p.logForm)
|
||||
messageMessageValues := t.getMessageNames(p.logForm)
|
||||
messageDropdown.SetLabel("message").SetOptions(messageMessageValues, msgDropdownFunc)
|
||||
// p.inputForm.Clear(false)
|
||||
|
||||
|
@ -726,7 +726,7 @@ func (t *tui) console(app *tview.Application) tview.Primitive {
|
|||
nodesDropdown.SetLabel("nodes").SetOptions(nodesList, nil)
|
||||
p.selectForm.AddFormItem(nodesDropdown)
|
||||
|
||||
msgsValues := getMessageNames(p.outputForm)
|
||||
msgsValues := t.getMessageNames(p.outputForm)
|
||||
|
||||
messageDropdown := tview.NewDropDown()
|
||||
messageDropdown.SetLabelColor(tcell.ColorIndianRed)
|
||||
|
@ -741,7 +741,7 @@ func (t *tui) console(app *tview.Application) tview.Primitive {
|
|||
}
|
||||
nodesDropdown.SetLabel("nodes").SetOptions(nodesList, nil)
|
||||
|
||||
msgsValues := getMessageNames(p.outputForm)
|
||||
msgsValues := t.getMessageNames(p.outputForm)
|
||||
messageDropdown.SetLabel("message").SetOptions(msgsValues, nil)
|
||||
})
|
||||
|
||||
|
@ -758,7 +758,7 @@ func (t *tui) console(app *tview.Application) tview.Primitive {
|
|||
}
|
||||
nodesDropdown.SetLabel("nodes").SetOptions(nodesList, nil)
|
||||
|
||||
messageValues := getMessageNames(p.outputForm)
|
||||
messageValues := t.getMessageNames(p.outputForm)
|
||||
messageDropdown.SetLabel("message").SetOptions(messageValues, nil)
|
||||
})
|
||||
|
||||
|
@ -839,7 +839,7 @@ func (t *tui) console(app *tview.Application) tview.Primitive {
|
|||
|
||||
// getMessageNames will get the names of all the messages in
|
||||
// the messages folder.
|
||||
func getMessageNames(outputForm *tview.TextView) []string {
|
||||
func (t *tui) getMessageNames(outputForm *tview.TextView) []string {
|
||||
// Create messages dropdown field.
|
||||
fInfo, err := ioutil.ReadDir("messages")
|
||||
if err != nil {
|
||||
|
|
Loading…
Add table
Reference in a new issue