1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

simplified the structure of the infile.txt json format

This commit is contained in:
postmannen 2021-02-10 10:14:49 +01:00
parent 1a3b19071e
commit dce6152496
9 changed files with 100 additions and 61 deletions

1
.gitignore vendored
View file

@ -1,3 +1,2 @@
textlogging.go
ship1/
ship2/

View file

@ -1,15 +1,8 @@
[
{
"subject":
{
"node":"central",
"commandOrEvent":"event",
"method":"textlogging"
},
"message":
{
"data": ["some message sent from a ship"],
"commandOrEvent":"Event"
}
"toNode": "central",
"data": ["some message sent from a ship\n"],
"commandOrEvent":"event",
"method":"textLogging"
}
]

View file

@ -1,15 +1,10 @@
[
{
"subject":
{
"node":"ship1",
"commandOrEvent":"command",
"method":"shellcommand"
},
"message":
{
"data": ["bash","-c","ls -l ../"],
"commandOrEvent":"Command"
}
"toNode": "ship1",
"data": ["bash","-c","ls -l ../"],
"commandOrEvent":"command",
"method":"shellCommand"
}
]

View file

@ -1,15 +1,9 @@
[
{
"subject":
{
"node":"ship2",
"commandOrEvent":"command",
"method":"shellcommand"
},
"message":
{
"data": ["bash","-c","tree ../"],
"commandOrEvent":"Command"
}
"toNode": "ship2",
"data": ["bash","-c","tree ../"],
"commandOrEvent":"Command",
"method":"shellCommand"
}
]

View file

@ -14,7 +14,7 @@ import (
// getMessagesFromFile will start a file watcher for the given directory
// and filename. It will take a channel of []byte as input, and it is
// in this channel the content of a file that has changed is returned.
func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []jsonFromFile) {
func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []subjectAndMessage) {
fileUpdated := make(chan bool)
go fileWatcherStart(directoryToCheck, fileUpdated)
@ -34,12 +34,15 @@ func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, f
// unmarshal the JSON into a struct
js, err := jsonFromFileData(b)
fmt.Printf("*** OUTPUT AFTER UNMARSHALING JSON: %#v\n", js)
if err != nil {
log.Printf("%v\n", err)
}
for i := range js {
fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.CommandOrEvent, js[i].Subject.CommandOrEvent)
// Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from.
js[i].Message.FromNode = node(s.nodeName)
}
@ -48,24 +51,39 @@ func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, f
}
}
type jsonFromFile struct {
type subjectAndMessage struct {
Subject `json:"subject" yaml:"subject"`
Message `json:"message" yaml:"message"`
}
func jsonFromFileData(b []byte) ([]jsonFromFile, error) {
JS := []jsonFromFile{}
func jsonFromFileData(b []byte) ([]subjectAndMessage, error) {
MsgSlice := []Message{}
err := json.Unmarshal(b, &JS)
err := json.Unmarshal(b, &MsgSlice)
fmt.Printf("*** OUTPUT DIRECTLY AFTER UNMARSHALING JSON: %#v\n", MsgSlice)
// TODO: Look into also marshaling from yaml and toml later
//err := yaml.Unmarshal(b, &JS)
//err := toml.Unmarshal(b, &JS)
//fmt.Printf("%#v\n", JS)
if err != nil {
return nil, fmt.Errorf("error: unmarshal of file failed: %#v", err)
}
return JS, nil
sam := []subjectAndMessage{}
for _, m := range MsgSlice {
s := Subject{
Node: string(m.ToNode),
CommandOrEvent: m.CommandOrEvent,
Method: m.Method,
}
sm := subjectAndMessage{
Subject: s,
Message: m,
}
sam = append(sam, sm)
}
return sam, nil
}
// readTruncateMessageFile, will read all the messages in the given
@ -118,7 +136,7 @@ func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) {
select {
case event := <-watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
log.Println("info: infile updated, processing input: ", event.Name)
log.Println("info: inmsg.txt file updated, processing input: ", event.Name)
//testing with an update chan to get updates
fileUpdated <- true
}

View file

@ -6,6 +6,7 @@ import (
"encoding/gob"
"fmt"
"log"
"os"
"os/exec"
"sync"
"time"
@ -28,16 +29,15 @@ type CommandOrEvent string
// It might make more sense to implement these as two
// individual subjects.
const (
// shellCommand, command that will just wait for an
// Command, command that will just wait for an
// ack, and nothing of the output of the command are
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
Command CommandOrEvent = "command"
// shellCommand, wait for and return the output
// of the command in the ACK message. This means
// Event, wait for and return the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation that it
// and that we should get the confirmation if it
// was successful or not.
Event CommandOrEvent = "event"
// eventCommand, just wait for the ACK that the
@ -46,6 +46,7 @@ const (
)
type Message struct {
ToNode node `json:"toNode" yaml:"toNode"`
// The Unique ID of the message
ID int `json:"id" yaml:"id"`
// The actual data in the message
@ -54,7 +55,9 @@ type Message struct {
Data []string `json:"data" yaml:"data"`
// The type of the message being sent
CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
FromNode node
// method, what is this message doing, etc. shellCommand, syslog, etc.
Method string `json:"method" yaml:"method"`
FromNode node
}
// server is the structure that will hold the state about spawned
@ -70,7 +73,7 @@ type server struct {
mu sync.Mutex
// The channel where we receive new messages from the outside to
// insert into the system for being processed
newMessagesCh chan []jsonFromFile
newMessagesCh chan []subjectAndMessage
// errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing
errorCh chan errProcess
@ -91,7 +94,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
nodeName: nodeName,
natsConn: conn,
processes: make(map[subjectName]process),
newMessagesCh: make(chan []jsonFromFile),
newMessagesCh: make(chan []subjectAndMessage),
errorCh: make(chan errProcess, 2),
logCh: make(chan []byte),
}
@ -113,14 +116,14 @@ func (s *server) Start() {
// Start the checking the input file for new messages from operator.
go s.getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh)
// Start the textlogging service that will run on the subscribers
// Start the textLogging service that will run on the subscribers
// TODO: Figure out how to structure event services like these
go s.startTextLogging(s.logCh)
// Start a subscriber for shellCommand messages
{
fmt.Printf("nodeName: %#v\n", s.nodeName)
sub := newSubject(s.nodeName, "command", "shellcommand")
sub := newSubject(s.nodeName, "command", "shellCommand")
proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
@ -129,7 +132,7 @@ func (s *server) Start() {
// Start a subscriber for textLogging messages
{
fmt.Printf("nodeName: %#v\n", s.nodeName)
sub := newSubject(s.nodeName, "event", "textlogging")
sub := newSubject(s.nodeName, "event", "textLogging")
proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
@ -208,7 +211,7 @@ type Subject struct {
Node string `json:"node" yaml:"node"`
// messageType, command/event
CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"`
// method, what is this message doing, etc. shellcommand, syslog, etc.
// method, what is this message doing, etc. shellCommand, syslog, etc.
Method string `json:"method" yaml:"method"`
// messageCh is the channel for receiving new content to be sent
messageCh chan Message
@ -330,7 +333,6 @@ func (s *server) processSpawnWorker(proc process) {
// handle subscriber workers
if proc.processKind == processKindSubscriber {
//subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand")
subject := string(proc.subject.name())
// Subscribe will start up a Go routine under the hood calling the
@ -356,7 +358,7 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
msg := &nats.Msg{
Subject: string(proc.subject.name()),
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "shellcommand"),
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "shellCommand"),
// Structure of the reply message are:
// reply.<nodename>.<message type>.<method>
Reply: fmt.Sprintf("reply.%s", proc.subject.name()),
@ -371,6 +373,7 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
subReply, err := natsConn.SubscribeSync(msg.Reply)
if err != nil {
log.Printf("error: nc.SubscribeSync failed: %v\n", err)
os.Exit(1)
continue
}
@ -436,7 +439,7 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, node string, msg *nats.M
// that there was a problem like missing method to handle a specific
// method etc.
switch {
case message.CommandOrEvent == "Command":
case message.CommandOrEvent == Command:
// Since the command to execute is at the first position in the
// slice we need to slice it out. The arguments are at the
@ -453,7 +456,7 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, node string, msg *nats.M
// Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out)))
case message.CommandOrEvent == "Event":
case message.CommandOrEvent == Event:
fmt.Printf("info: sending over the message %#v\n", message)
for _, d := range message.Data {

View file

@ -1 +0,0 @@
package steward

30
textlogging.go Normal file
View file

@ -0,0 +1,30 @@
package steward
import (
"fmt"
"log"
"os"
)
// startTextLogging will open a file ready for writing log messages to,
// and the input for writing to the file is given via the logCh argument.
func (s *server) startTextLogging(logCh chan []byte) {
fileName := "./textlogging.log"
f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, os.ModeAppend)
if err != nil {
log.Printf("Failed to open file %v\n", err)
return
}
defer f.Close()
for b := range logCh {
fmt.Printf("***** Trying to write to file : %s\n\n", b)
_, err := f.Write(b)
f.Sync()
if err != nil {
log.Printf("Failed to open file %v\n", err)
}
}
}

View file

@ -1 +1,9 @@
some message sent from a shipsome message sent from a shipsome message sent from a shipsome message sent from a ship
some message sent from a shipsome message sent from a shipsome message sent from a shipsome message sent from a shipsome message sent from a shipsome message sent from a shipsome message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship
some message sent from a ship