1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

input from file seems to initially work

This commit is contained in:
postmannen 2021-02-04 13:26:10 +01:00
parent 86bfb014b3
commit 98d5c4b3da
11 changed files with 94 additions and 45 deletions

View file

@ -58,7 +58,8 @@ Subject naming are case sensitive, and can not contain the space are the tab cha
Nodename: Are the hostname of the device. This do not have to be resolvable via DNS, it is just a unique name for the host to receive the message.
Command/Event: Are type of message sent. `command` or `event`. Description of the differences are mentioned earlier.
Command/Event: Are type of message sent. `command` or `event`. Description of the differences are mentioned earlier.\
Info: The command/event which is called a MessageType are present in both the Subject structure and the Message structure. The reason for this is that it is used both in the naming of a subject, and in the message for knowing what kind of message it is and how to handle it.
Method: Are the functionality the message provide. Example could be `shellcommand` or `syslogforwarding`

View file

@ -0,0 +1,16 @@
[
{
"subject":
{
"node":"ship1",
"messageType":"command",
"method":"shellcommand",
"domain":"shell"
},
"message":
{
"data": ["bash","-c","uname -a"],
"messageType":"Command"
}
}
]

View file

@ -0,0 +1,16 @@
[
{
"subject":
{
"node":"ship2",
"messageType":"command",
"method":"shellcommand",
"domain":"shell"
},
"message":
{
"data": ["bash","-c","tree"],
"messageType":"Command"
}
}
]

View file

@ -10,7 +10,7 @@
"message":
{
"data": ["bash","-c","uname -a"],
"messageType":"eventReturnAck"
"messageType":"Command"
}
},
{
@ -24,7 +24,7 @@
"message":
{
"data": ["bash","-c","uname -a"],
"messageType":"eventReturnAck"
"messageType":"Command"
}
}
]

3
go.mod
View file

@ -3,12 +3,9 @@ module github.com/RaaLabs/steward
go 1.15
require (
github.com/BurntSushi/toml v0.3.1
github.com/fsnotify/fsnotify v1.4.9
github.com/golang/protobuf v1.4.3 // indirect
github.com/nats-io/nats-server/v2 v2.1.9 // indirect
github.com/nats-io/nats.go v1.10.0
github.com/pelletier/go-toml v1.8.1
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/yaml.v2 v2.4.0
)

7
go.sum
View file

@ -3,7 +3,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
@ -39,8 +38,6 @@ github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pelletier/go-toml v1.8.1 h1:1Nf83orprkJyknT6h7zbuEGUEjcyVlCxSUGTENmNCRM=
github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -91,9 +88,5 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View file

@ -23,13 +23,13 @@ const (
// delivered back in the reply ack message.
// The message should contain the unique ID of the
// command.
CommandReturnOutput MessageType = "commandReturnOutput"
Command MessageType = "command"
// shellCommand, wait for and return the output
// of the command in the ACK message. This means
// that the command should be executed immediately
// and that we should get the confirmation that it
// was successful or not.
EventReturnAck MessageType = "eventReturnAck"
Event MessageType = "event"
// eventCommand, just wait for the ACK that the
// message is received. What action happens on the
// receiving side is up to the received to decide.
@ -54,8 +54,12 @@ type server struct {
processes map[subjectName]process
// The last processID created
lastProcessID int
nodeName string
mu sync.Mutex
// The name of the node
nodeName string
mu sync.Mutex
// The channel where we receive new messages from the outside to
// insert into the system for being processed
newMessagesCh chan []jsonFromFile
}
// newServer will prepare and return a server type
@ -66,9 +70,10 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
}
s := &server{
nodeName: nodeName,
natsConn: conn,
processes: make(map[subjectName]process),
nodeName: nodeName,
natsConn: conn,
processes: make(map[subjectName]process),
newMessagesCh: make(chan []jsonFromFile),
}
// Start the error handler
@ -95,23 +100,22 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
func (s *server) PublisherStart() {
// start the checking of files for input messages
fileReadCh := make((chan []jsonFromFile))
go getMessagesFromFile("./", "inmsg.txt", fileReadCh)
go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh)
// TODO: For now we just print content of the files read.
// Replace this with a broker function that will know how
// send it on to the correct publisher.
go func() {
for v := range fileReadCh {
// Check if there are new content read from file input
fmt.Printf("received: %#v\n", v)
}
}()
// go func() {
// for v := range s.newMessagesCh {
// // Check if there are new content read from file input
// fmt.Printf("received: %#v\n", v)
//
// }
// }()
// Prepare and start a single process
{
sub := newSubject("btship1", "command", "shellcommand", "shell")
sub := newSubject("ship1", "command", "shellcommand", "shell")
proc := s.processPrepareNew(sub)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc)
@ -119,7 +123,7 @@ func (s *server) PublisherStart() {
// Prepare and start a single process
{
sub := newSubject("btship2", "command", "shellcommand", "shell")
sub := newSubject("ship2", "command", "shellcommand", "shell")
proc := s.processPrepareNew(sub)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc)
@ -127,18 +131,23 @@ func (s *server) PublisherStart() {
// Simulate generating some commands to be sent as messages to nodes.
go func() {
for {
m := Message{
Data: []string{"bash", "-c", "uname -a"},
MessageType: EventReturnAck,
}
subjName := subjectName("btship1.command.shellcommand.shell")
_, ok := s.processes[subjName]
if ok {
s.processes[subjName].subject.messageCh <- m
} else {
time.Sleep(time.Millisecond * 500)
continue
for v := range s.newMessagesCh {
for _, vv := range v {
m := vv.Message
subjName := vv.Subject.name()
fmt.Printf("** message: %v, ** subject: %v\n", m, vv.Subject)
_, ok := s.processes[subjName]
if ok {
log.Printf("info: found the specific subject: %v\n", subjName)
fmt.Printf("* Before Putting incomming message on subject.messageCh\n")
s.processes[subjName].subject.messageCh <- m
fmt.Printf("* After Putting incomming message on subject.messageCh\n")
} else {
log.Printf("info: did not find that specific subject: %v\n", subjName)
time.Sleep(time.Millisecond * 500)
continue
}
}
}
}()
@ -148,7 +157,7 @@ func (s *server) PublisherStart() {
// for {
// m := Message{
// Data: []string{"bash", "-c", "uname -a"},
// MessageType: eventReturnAck,
// MessageType: Event,
// }
// subjName := subjectName("btship2.command.shellcommand.shell")
// _, ok := s.processes[subjName]
@ -262,7 +271,9 @@ func (s *server) processSpawn(proc process) {
// messages from the message-pickup-process.
for {
// Wait and read the next message on the message channel
fmt.Printf("* Before checking messageCh inside process\n")
m := <-proc.subject.messageCh
fmt.Printf("* After checking messageCh inside process: %v\n", m)
m.ID = s.processes[proc.subject.name()].messageID
messageDeliver(proc, m, s.natsConn)

View file

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/gob"
"fmt"
"log"
"os"
"os/exec"
@ -33,7 +34,7 @@ func (s *server) RunSubscriber() {
msg := <-reqMsgCh
fmt.Printf("%v\n", msg)
switch msg.MessageType {
case EventReturnAck:
case "Command":
// Since the command to execute is at the first position in the
// slice we need to slice it out. The arguments are at the
// remaining positions.
@ -45,6 +46,20 @@ func (s *server) RunSubscriber() {
if err != nil {
fmt.Printf("error: execution of command failed: %v\n", err)
}
case "Event":
// Since the command to execute is at the first position in the
// slice we need to slice it out. The arguments are at the
// remaining positions.
c := msg.Data[0]
a := msg.Data[1:]
cmd := exec.Command(c, a...)
cmd.Stdout = os.Stdout
err := cmd.Start()
if err != nil {
fmt.Printf("error: execution of command failed: %v\n", err)
}
default:
log.Printf("info: did not find that specific type of command: %#v\n", msg.MessageType)
}
}