1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

refactoring for reading input from file...and more

This commit is contained in:
postmannen 2021-02-03 22:08:28 +01:00
parent efb781ccf0
commit dc2352cab7
7 changed files with 131 additions and 60 deletions

View file

@ -2,6 +2,7 @@ package steward
import (
"bufio"
"encoding/json"
"fmt"
"io"
"log"
@ -13,7 +14,7 @@ import (
// getMessagesFromFile will start a file watcher for the given directory
// and filename. It will take a channel of []byte as input, and it is
// in this channel the content of a file that has changed is returned.
func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []byte) {
func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []jsonFromFile) {
fileUpdated := make(chan bool)
go fileWatcherStart(directoryToCheck, fileUpdated)
@ -26,13 +27,36 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh
log.Printf("error: reading file: %v", err)
}
fileContentCh <- b
fmt.Printf("File content read: %s\n", b)
// unmarshal the JSON into a struct
js, err := jsonFromFileData(b)
if err != nil {
log.Printf("%v\n", err)
}
// Send the data back to be consumed
fileContentCh <- js
}
}
}
type jsonFromFile struct {
Subject `json:"subject"`
Message `json:"message"`
}
func jsonFromFileData(b []byte) ([]jsonFromFile, error) {
JS := []jsonFromFile{}
//err := yaml.Unmarshal(b, &JS)
err := json.Unmarshal(b, &JS)
if err != nil {
return nil, fmt.Errorf("error: json unmarshal of file failed: %v", err)
}
return JS, nil
}
// readTruncateMessageFile, will read all the messages in the given
// file, and truncate the file after read.
// A []byte will be returned with the content read.
@ -53,20 +77,14 @@ func readTruncateMessageFile(fileName string) ([]byte, error) {
lines = append(lines, scanner.Bytes()...)
}
fmt.Printf("*** DEBUG : %s\n", lines)
fmt.Printf("read: %s\n", lines)
// empty the file after all is read
ret, err := f.Seek(0, io.SeekStart)
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("f.Seek failed: %v", err)
}
fmt.Printf("** ret=%v\n", ret)
err = f.Truncate(0)
if err != nil {
fmt.Printf("******* %#v\n", err)
return nil, fmt.Errorf("f.Truncate failed: %v", err)
}

2
go.mod
View file

@ -4,8 +4,10 @@ go 1.15
require (
github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.4.3 // indirect
github.com/nats-io/nats-server/v2 v2.1.9 // indirect
github.com/nats-io/nats.go v1.10.0
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

5
go.sum
View file

@ -6,6 +6,8 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -87,5 +89,8 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

30
orig.json Normal file
View file

@ -0,0 +1,30 @@
[
{
"subject":
{
"node":"ship1",
"messageType":"command",
"method":"shellcommand",
"domain":"shell"
},
"message":
{
"data": ["bash","-c","uname -a"],
"messageType":"eventReturnAck"
}
},
{
"subject":
{
"node":"ship2",
"messageType":"command",
"method":"shellcommand",
"domain":"shell"
},
"message":
{
"data": ["bash","-c","uname -a"],
"messageType":"eventReturnAck"
}
}
]

23
orig.yaml Normal file
View file

@ -0,0 +1,23 @@
---
- subject:
node: ship1
messageType: command
method: shellcommand
domain: shell
message:
data:
- bash
- "-c"
- uname -a
messageType: eventReturnAck
- subject:
node: ship2
messageType: command
method: shellcommand
domain: shell
message:
data:
- bash
- "-c"
- uname -a
messageType: eventReturnAck

View file

@ -12,9 +12,7 @@ import (
"github.com/nats-io/nats.go"
)
var mu sync.Mutex
type messageType int
type MessageType string
// TODO: Figure it makes sense to have these types at all.
// It might make more sense to implement these as two
@ -25,13 +23,13 @@ const (
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
commandReturnOutput messageType = iota
CommandReturnOutput MessageType = "commandReturnOutput"
// shellCommand, wait for and return the output
// of the command in the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation that it
// was successful or not.
eventReturnAck messageType = iota
EventReturnAck MessageType = "eventReturnAck"
// eventCommand, just wait for the ACK that the
// message is received. What action happens on the
// receiving side is up to the received to decide.
@ -39,13 +37,13 @@ const (
type Message struct {
// The Unique ID of the message
ID int
ID int `json:"id"`
// The actual data in the message
// TODO: Change this to a slice instead...or maybe use an
// interface type here to handle several data types ?
Data []string
Data []string `json:"data"`
// The type of the message being sent
MessageType messageType
MessageType MessageType `json:"messageType"`
}
// server is the structure that will hold the state about spawned
@ -57,6 +55,7 @@ type server struct {
// The last processID created
lastProcessID int
nodeName string
mu sync.Mutex
}
// newServer will prepare and return a server type
@ -72,6 +71,9 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
processes: make(map[subjectName]process),
}
// Start the error handler
// TODO: For now it will just print the error messages to the
// console.
go func() {
for {
@ -93,29 +95,23 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
func (s *server) PublisherStart() {
// start the checking of files for input messages
fileReadCh := make((chan []byte))
fileReadCh := make((chan []jsonFromFile))
go getMessagesFromFile("./", "inmsg.txt", fileReadCh)
// TODO: For now we just print content of the files read.
// Replace this whit a broker function that will know how
// Replace this with a broker function that will know how
// send it on to the correct publisher.
go func() {
for b := range fileReadCh {
for v := range fileReadCh {
// Check if there are new content read from file input
fmt.Printf("received: %s\n", b)
fmt.Printf("received: %#v\n", v)
}
}()
// Prepare and start a single process
{
sub := subject{
node: "btship1",
messageType: "command",
method: "shellcommand",
domain: "shell",
messageCh: make(chan Message),
}
sub := newSubject("btship1", "command", "shellcommand", "shell")
proc := s.processPrepareNew(sub)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc)
@ -123,13 +119,7 @@ func (s *server) PublisherStart() {
// Prepare and start a single process
{
sub := subject{
node: "btship2",
messageType: "command",
method: "shellcommand",
domain: "shell",
messageCh: make(chan Message),
}
sub := newSubject("btship2", "command", "shellcommand", "shell")
proc := s.processPrepareNew(sub)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc)
@ -140,7 +130,7 @@ func (s *server) PublisherStart() {
for {
m := Message{
Data: []string{"bash", "-c", "uname -a"},
MessageType: eventReturnAck,
MessageType: EventReturnAck,
}
subjName := subjectName("btship1.command.shellcommand.shell")
_, ok := s.processes[subjName]
@ -179,26 +169,39 @@ type node string
// subject contains the representation of a subject to be used with one
// specific process
type subject struct {
type Subject struct {
// node, the name of the node
node string
Node string `json:"node"`
// messageType, command/event
messageType string
MessageType string `json:"messageType"`
// method, what is this message doing, etc. shellcommand, syslog, etc.
method string
Method string `json:"method"`
// domain is used to differentiate services. Like there can be more
// logging services, but rarely more logging services for the same
// thing. Domain is here used to differentiate the the services and
// tell with one word what it is for.
domain string
Domain string `json:"domain"`
// messageCh is the channel for receiving new content to be sent
messageCh chan Message
}
// newSubject will return a new variable of the type subject, and insert
// all the values given as arguments. It will also create the channel
// to receive new messages on the specific subject.
func newSubject(node string, messageType string, method string, domain string) Subject {
return Subject{
Node: node,
MessageType: messageType,
Method: method,
Domain: domain,
messageCh: make(chan Message),
}
}
type subjectName string
func (s subject) name() subjectName {
return subjectName(fmt.Sprintf("%s.%s.%s.%s", s.node, s.messageType, s.method, s.domain))
func (s Subject) name() subjectName {
return subjectName(fmt.Sprintf("%s.%s.%s.%s", s.Node, s.MessageType, s.Method, s.Domain))
}
// process are represent the communication to one individual host
@ -207,7 +210,7 @@ type process struct {
// the subject used for the specific process. One process
// can contain only one sender on a message bus, hence
// also one subject
subject subject
subject Subject
// Put a node here to be able know the node a process is at.
// NB: Might not be needed later on.
node node
@ -223,13 +226,13 @@ type process struct {
// prepareNewProcess will set the the provided values and the default
// values for a process.
func (s *server) processPrepareNew(subject subject) process {
func (s *server) processPrepareNew(subject Subject) process {
// create the initial configuration for a sessions communicating with 1 host process.
s.lastProcessID++
proc := process{
messageID: 0,
subject: subject,
node: node(subject.node),
node: node(subject.Node),
processID: s.lastProcessID,
errorCh: make(chan string),
//messageCh: make(chan Message),
@ -242,12 +245,12 @@ func (s *server) processPrepareNew(subject subject) process {
// the next available ID, and also add the process to the processes
// map.
func (s *server) processSpawn(proc process) {
mu.Lock()
s.mu.Lock()
// We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle
// one message queue.
s.processes[proc.subject.name()] = proc
mu.Unlock()
s.mu.Unlock()
// Loop creating one new message every second to simulate getting new
// messages to deliver.
@ -258,7 +261,6 @@ func (s *server) processSpawn(proc process) {
// is listened on in the for loop below could be used to receive the
// messages from the message-pickup-process.
for {
// m := getMessageToDeliver()
// Wait and read the next message on the message channel
m := <-proc.subject.messageCh
m.ID = s.processes[proc.subject.name()].messageID
@ -277,15 +279,6 @@ func (s *server) processSpawn(proc process) {
}
}
// get MessageToDeliver will pick up the next message to be created.
// TODO: read this from local file or rest or....?
func getMessageToDeliver() Message {
return Message{
Data: []string{"bash", "-c", "uname -a"},
MessageType: eventReturnAck,
}
}
func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
for {
dataPayload, err := gobEncodePayload(message)

View file

@ -33,7 +33,7 @@ func (s *server) RunSubscriber() {
msg := <-reqMsgCh
fmt.Printf("%v\n", msg)
switch msg.MessageType {
case eventReturnAck:
case EventReturnAck:
// Since the command to execute is at the first position in the
// slice we need to slice it out. The arguments are at the
// remaining positions.