1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 04:49:17 +00:00
ctrl/getmessagefromfile.go

188 lines
4.9 KiB
Go
Raw Normal View History

package steward
import (
"bufio"
"encoding/json"
"fmt"
"io"
"log"
"os"
"github.com/fsnotify/fsnotify"
)
// getMessagesFromFile will start a file watcher for the given directory
// and filename. It will take a channel of []byte as input, and it is
// in this channel the content of a file that has changed is returned.
func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, inputFromFileCh chan []subjectAndMessage) {
fileUpdated := make(chan bool)
go fileWatcherStart(directoryToCheck, fileUpdated)
2021-02-05 06:25:12 +00:00
for range fileUpdated {
//load file, read it's content
b, err := readTruncateMessageFile(fileName)
if err != nil {
log.Printf("error: reading file: %v", err)
}
// Start on top again if the file did not contain
// any data.
if len(b) == 0 {
continue
}
2021-02-05 06:25:12 +00:00
// unmarshal the JSON into a struct
js, err := jsonFromFileData(b)
if err != nil {
2021-03-12 05:48:48 +00:00
er := fmt.Errorf("error: malformed json: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er)
continue
}
for i := range js {
2021-02-10 06:25:44 +00:00
fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.CommandOrEvent, js[i].Subject.CommandOrEvent)
// Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from.
2021-02-09 14:08:04 +00:00
js[i].Message.FromNode = node(s.nodeName)
2021-02-05 06:25:12 +00:00
}
// Send the data back to be consumed
inputFromFileCh <- js
2021-02-05 06:25:12 +00:00
}
2021-03-12 05:48:48 +00:00
er := fmt.Errorf("error: getMessagesFromFile stopped")
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er)
}
type subjectAndMessage struct {
2021-02-04 10:46:58 +00:00
Subject `json:"subject" yaml:"subject"`
Message `json:"message" yaml:"message"`
}
2021-03-12 05:48:48 +00:00
// jsonFromFileData will range over the message given in json format. For
// each element found the Message type will be converted into a SubjectAndMessage
// type value and appended to a slice, and the slice is returned to the caller.
func jsonFromFileData(b []byte) ([]subjectAndMessage, error) {
MsgSlice := []Message{}
err := json.Unmarshal(b, &MsgSlice)
if err != nil {
2021-02-04 10:46:58 +00:00
return nil, fmt.Errorf("error: unmarshal of file failed: %#v", err)
}
sam := []subjectAndMessage{}
2021-02-25 10:08:05 +00:00
// Range over all the messages parsed from json, and create a subject for
// each message.
for _, m := range MsgSlice {
sm, err := newSAM(m)
if err != nil {
log.Printf("error: jsonFromFileData: %v\n", err)
continue
}
sam = append(sam, sm)
}
return sam, nil
}
2021-03-11 05:34:36 +00:00
// newSAM will look up the correct values and value types to
2021-03-10 06:11:14 +00:00
// be used in a subject for a Message, and return the a combined structure
// of type subjectAndMessage.
func newSAM(m Message) (subjectAndMessage, error) {
2021-03-10 06:11:14 +00:00
// We need to create a tempory method type to look up the kind for the
// real method for the message.
var mt Method
//fmt.Printf("-- \n getKind contains: %v\n\n", mt.getHandler(m.Method).getKind())
tmpH := mt.getHandler(m.Method)
if tmpH == nil {
return subjectAndMessage{}, fmt.Errorf("error: method value did not exist in map")
}
2021-03-10 06:11:14 +00:00
sub := Subject{
ToNode: string(m.ToNode),
CommandOrEvent: tmpH.getKind(),
2021-03-10 06:11:14 +00:00
Method: m.Method,
}
sm := subjectAndMessage{
Subject: sub,
Message: m,
}
return sm, nil
2021-03-10 06:11:14 +00:00
}
// readTruncateMessageFile, will read all the messages in the given
// file, and truncate the file after read.
// A []byte will be returned with the content read.
func readTruncateMessageFile(fileName string) ([]byte, error) {
2021-03-25 11:33:53 +00:00
f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, os.ModeAppend)
if err != nil {
log.Printf("error: readTruncateMessageFile: Failed to open file: %v\n", err)
return nil, err
}
defer f.Close()
scanner := bufio.NewScanner(f)
lines := []byte{}
for scanner.Scan() {
lines = append(lines, scanner.Bytes()...)
}
// empty the file after all is read
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("f.Seek failed: %v", err)
}
err = f.Truncate(0)
if err != nil {
return nil, fmt.Errorf("f.Truncate failed: %v", err)
}
return lines, nil
}
2021-03-12 05:48:48 +00:00
// Start the file watcher that will check if the in pipe for new operator
// messages are updated with new content.
func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Println("Failed fsnotify.NewWatcher")
return
}
defer watcher.Close()
done := make(chan bool)
go func() {
//Give a true value to updated so it reads the file the first time.
fileUpdated <- true
for {
select {
case event := <-watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
2021-03-25 11:33:53 +00:00
// log.Println("info: steward.sock file updated, processing input: ", event.Name)
//testing with an update chan to get updates
fileUpdated <- true
}
case err := <-watcher.Errors:
log.Println("error:", err)
}
}
}()
err = watcher.Add(directoryToCheck)
if err != nil {
log.Printf("error: watcher add: %v\n", err)
}
<-done
}