1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-05 20:09:16 +00:00

unix socket for new mesages, changed naming of channels

This commit is contained in:
postmannen 2021-03-29 13:36:30 +02:00
parent 0d822b0751
commit ed00f247ae
12 changed files with 101 additions and 170 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ tmp/
incommmingBuffer.db incommmingBuffer.db
store.log store.log
changes.md changes.md
steward.sock

View file

@ -211,7 +211,7 @@ methodTimeout
### How to send a Message ### How to send a Message
Right now the API for sending a message from one node to another node is by pasting a structured JSON object into a file called `steward.sock` living alongside the binary. This file will be watched continously, and when updated the content will be picked up, umarshaled, and if OK it will be sent a message to the node specified in the `toNode` field. Right now the API for sending a message from one node to another node is by pasting a structured JSON object into a file called `msg.pipe` living alongside the binary. This file will be watched continously, and when updated the content will be picked up, umarshaled, and if OK it will be sent a message to the node specified in the `toNode` field.
The `method` is what defines what the event will do. The preconfigured methods are: The `method` is what defines what the event will do. The preconfigured methods are:
@ -263,7 +263,7 @@ NB: Both the keys and the values used are case sensitive.
#### Sending a command from one Node to Another Node #### Sending a command from one Node to Another Node
Example JSON for appending a message of type command into the `steward.sock` file Example JSON for appending a message of type command into the `msg.pipe` file
```json ```json
[ [
@ -317,11 +317,11 @@ To send a message with custom timeout and amount of retries
] ]
``` ```
You can save the content to myfile.JSON and append it to `steward.sock` You can save the content to myfile.JSON and append it to `msg.pipe`
`cat myfile.json >> steward.sock` `cat myfile.json >> msg.pipe`
The content of `steward.sock` will be erased as messages a processed. The content of `msg.pipe` will be erased as messages a processed.
#### Sending a message of type Event #### Sending a message of type Event
@ -336,11 +336,11 @@ The content of `steward.sock` will be erased as messages a processed.
] ]
``` ```
You can save the content to myfile.JSON and append it to `steward.sock` You can save the content to myfile.JSON and append it to `msg.pipe`
`cat myfile.json >> steward.sock` `cat myfile.json >> msg.pipe`
The content of `steward.sock` will be erased as messages a processed. The content of `msg.pipe` will be erased as messages a processed.
## Concepts/Ideas ## Concepts/Ideas
@ -369,7 +369,7 @@ and for a shell command of type command to a host named "ship2"
## TODO ## TODO
- FIX so it can handle multiple slices of input for steward.sock - FIX so it can handle multiple slices of input for msg.pipe
- Make a scraper that first send an EventACK, and the content of the scraping is returned by a node as a new EventACK back the where the initial event originated. - Make a scraper that first send an EventACK, and the content of the scraping is returned by a node as a new EventACK back the where the initial event originated.

View file

@ -2,7 +2,7 @@
{ {
"toNode": "ship1", "toNode": "ship1",
"data": ["bash","-c","sleep 3 & echo 'apekatt'"], "data": ["bash","-c","sleep 3 & tree ./"],
"method":"CLICommandRequest", "method":"CLICommandRequest",
"timeout":10, "timeout":10,
"retries":3, "retries":3,

1
go.mod
View file

@ -3,7 +3,6 @@ module github.com/RaaLabs/steward
go 1.15 go 1.15
require ( require (
github.com/fsnotify/fsnotify v1.4.9
github.com/nats-io/nats-server/v2 v2.1.9 // indirect github.com/nats-io/nats-server/v2 v2.1.9 // indirect
github.com/nats-io/nats.go v1.10.0 github.com/nats-io/nats.go v1.10.0
github.com/pelletier/go-toml v1.8.1 github.com/pelletier/go-toml v1.8.1

3
go.sum
View file

@ -55,8 +55,6 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
@ -342,7 +340,6 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View file

@ -55,12 +55,12 @@ type process struct {
// copy of the configuration from server // copy of the configuration from server
configuration *Configuration configuration *Configuration
// The new messages channel copied from *Server // The new messages channel copied from *Server
newMessagesCh chan<- []subjectAndMessage toRingbufferCh chan<- []subjectAndMessage
} }
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
// values for a process. // values for a process.
func newProcess(processes *processes, newMessagesCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process { func newProcess(processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process {
// create the initial configuration for a sessions communicating with 1 host process. // create the initial configuration for a sessions communicating with 1 host process.
processes.lastProcessID++ processes.lastProcessID++
@ -81,7 +81,7 @@ func newProcess(processes *processes, newMessagesCh chan<- []subjectAndMessage,
processKind: processKind, processKind: processKind,
allowedReceivers: m, allowedReceivers: m,
methodsAvailable: method.GetMethodsAvailable(), methodsAvailable: method.GetMethodsAvailable(),
newMessagesCh: newMessagesCh, toRingbufferCh: toRingbufferCh,
configuration: configuration, configuration: configuration,
} }
@ -135,7 +135,7 @@ func (p process) spawnWorker(s *server) {
err := p.procFunc() err := p.procFunc()
if err != nil { if err != nil {
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er) sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
} }
}() }()
} }
@ -155,7 +155,7 @@ func (p process) spawnWorker(s *server) {
err := p.procFunc() err := p.procFunc()
if err != nil { if err != nil {
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er) sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
} }
}() }()
} }
@ -174,7 +174,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
dataPayload, err := gobEncodeMessage(message) dataPayload, err := gobEncodeMessage(message)
if err != nil { if err != nil {
er := fmt.Errorf("error: createDataPayload: %v", err) er := fmt.Errorf("error: createDataPayload: %v", err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er) sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
continue continue
} }
@ -195,7 +195,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
subReply, err := natsConn.SubscribeSync(msg.Reply) subReply, err := natsConn.SubscribeSync(msg.Reply)
if err != nil { if err != nil {
er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message: %v", err) er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message: %v", err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er) sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
continue continue
} }
@ -203,7 +203,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
err = natsConn.PublishMsg(msg) err = natsConn.PublishMsg(msg)
if err != nil { if err != nil {
er := fmt.Errorf("error: publish failed: %v", err) er := fmt.Errorf("error: publish failed: %v", err)
sendErrorLogMessage(p.newMessagesCh, node(p.node), er) sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
continue continue
} }
@ -217,7 +217,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout)) msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout))
if err != nil { if err != nil {
er := fmt.Errorf("error: subReply.NextMsg failed for node=%v, subject=%v: %v", p.node, p.subject.name(), err) er := fmt.Errorf("error: subReply.NextMsg failed for node=%v, subject=%v: %v", p.node, p.subject.name(), err)
sendErrorLogMessage(p.newMessagesCh, message.FromNode, er) sendErrorLogMessage(p.toRingbufferCh, message.FromNode, er)
// did not receive a reply, decide what to do.. // did not receive a reply, decide what to do..
retryAttempts++ retryAttempts++
@ -229,7 +229,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
case retryAttempts >= message.Retries: case retryAttempts >= message.Retries:
// max retries reached // max retries reached
er := fmt.Errorf("info: max retries for message reached, breaking out: %v", message) er := fmt.Errorf("info: max retries for message reached, breaking out: %v", message)
sendErrorLogMessage(p.newMessagesCh, message.FromNode, er) sendErrorLogMessage(p.toRingbufferCh, message.FromNode, er)
return return
default: default:
@ -264,7 +264,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
err := gobDec.Decode(&message) err := gobDec.Decode(&message)
if err != nil { if err != nil {
er := fmt.Errorf("error: gob decoding failed: %v", err) er := fmt.Errorf("error: gob decoding failed: %v", err)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
} }
switch { switch {
@ -272,7 +272,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
mh, ok := p.methodsAvailable.CheckIfExists(message.Method) mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
} }
out := []byte("not allowed from " + message.FromNode) out := []byte("not allowed from " + message.FromNode)
@ -291,11 +291,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err) er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
} }
} else { } else {
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
} }
// Send a confirmation message back to the publisher // Send a confirmation message back to the publisher
@ -305,7 +305,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
mf, ok := p.methodsAvailable.CheckIfExists(message.Method) mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
} }
// Check if we are allowed to receive from that host // Check if we are allowed to receive from that host
@ -325,16 +325,16 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err) er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
} }
} else { } else {
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
} }
// --- // ---
default: default:
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent) er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), er) sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
} }
} }

View file

@ -1,57 +1,65 @@
package steward package steward
import ( import (
"bufio" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log" "log"
"net"
"os" "os"
"github.com/fsnotify/fsnotify"
) )
// getMessagesFromFile will start a file watcher for the given directory // readSocket will read the .sock file specified.
// and filename. It will take a channel of []byte as input, and it is // It will take a channel of []byte as input, and it is in this
// in this channel the content of a file that has changed is returned. // channel the content of a file that has changed is returned.
func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, inputFromFileCh chan []subjectAndMessage) { func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
fileUpdated := make(chan bool) err := os.Remove("steward.sock")
go fileWatcherStart(directoryToCheck, fileUpdated)
for range fileUpdated {
//load file, read it's content
b, err := readTruncateMessageFile(fileName)
if err != nil { if err != nil {
log.Printf("error: reading file: %v", err) log.Printf("error: could not delete sock file: %v\n", err)
} }
// Start on top again if the file did not contain l, err := net.Listen("unix", "steward.sock")
// any data. if err != nil {
if len(b) == 0 { log.Printf("error: failed to open socket: %v\n", err)
os.Exit(1)
}
// Loop, and wait for new connections.
for {
conn, err := l.Accept()
if err != nil {
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er)
}
b := make([]byte, 65535)
_, err = conn.Read(b)
if err != nil {
er := fmt.Errorf("error: failed to read data from socket: %v", err)
sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er)
continue continue
} }
b = bytes.Trim(b, "\x00")
// unmarshal the JSON into a struct // unmarshal the JSON into a struct
js, err := jsonFromFileData(b) sam, err := convertBytesToSAM(b)
if err != nil { if err != nil {
er := fmt.Errorf("error: malformed json: %v", err) er := fmt.Errorf("error: malformed json: %v", err)
sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er) sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er)
continue continue
} }
for i := range js { for i := range sam {
fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.CommandOrEvent, js[i].Subject.CommandOrEvent)
// Fill in the value for the FromNode field, so the receiver // Fill in the value for the FromNode field, so the receiver
// can check this field to know where it came from. // can check this field to know where it came from.
js[i].Message.FromNode = node(s.nodeName) sam[i].Message.FromNode = node(s.nodeName)
} }
// Send the data back to be consumed // Send the SAM struct to be picked up by the ring buffer.
inputFromFileCh <- js toRingbufferCh <- sam
} }
er := fmt.Errorf("error: getMessagesFromFile stopped")
sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er)
} }
type subjectAndMessage struct { type subjectAndMessage struct {
@ -59,10 +67,11 @@ type subjectAndMessage struct {
Message `json:"message" yaml:"message"` Message `json:"message" yaml:"message"`
} }
// jsonFromFileData will range over the message given in json format. For // convertBytesToSAM will range over the byte representing a message given in
// each element found the Message type will be converted into a SubjectAndMessage // json format. For each element found the Message type will be converted into
// type value and appended to a slice, and the slice is returned to the caller. // a SubjectAndMessage type value and appended to a slice, and the slice is
func jsonFromFileData(b []byte) ([]subjectAndMessage, error) { // returned to the caller.
func convertBytesToSAM(b []byte) ([]subjectAndMessage, error) {
MsgSlice := []Message{} MsgSlice := []Message{}
err := json.Unmarshal(b, &MsgSlice) err := json.Unmarshal(b, &MsgSlice)
@ -114,72 +123,3 @@ func newSAM(m Message) (subjectAndMessage, error) {
return sm, nil return sm, nil
} }
// readTruncateMessageFile, will read all the messages in the given
// file, and truncate the file after read.
// A []byte will be returned with the content read.
func readTruncateMessageFile(fileName string) ([]byte, error) {
f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, os.ModeAppend)
if err != nil {
log.Printf("error: readTruncateMessageFile: Failed to open file: %v\n", err)
return nil, err
}
defer f.Close()
scanner := bufio.NewScanner(f)
lines := []byte{}
for scanner.Scan() {
lines = append(lines, scanner.Bytes()...)
}
// empty the file after all is read
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("f.Seek failed: %v", err)
}
err = f.Truncate(0)
if err != nil {
return nil, fmt.Errorf("f.Truncate failed: %v", err)
}
return lines, nil
}
// Start the file watcher that will check if the in pipe for new operator
// messages are updated with new content.
func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Println("Failed fsnotify.NewWatcher")
return
}
defer watcher.Close()
done := make(chan bool)
go func() {
//Give a true value to updated so it reads the file the first time.
fileUpdated <- true
for {
select {
case event := <-watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
// log.Println("info: steward.sock file updated, processing input: ", event.Name)
//testing with an update chan to get updates
fileUpdated <- true
}
case err := <-watcher.Errors:
log.Println("error:", err)
}
}
}()
err = watcher.Add(directoryToCheck)
if err != nil {
log.Printf("error: watcher add: %v\n", err)
}
<-done
}

View file

@ -109,7 +109,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
} }
// Check for incomming messages. These are typically comming from // Check for incomming messages. These are typically comming from
// the go routine who reads steward.sock. // the go routine who reads msg.pipe.
for v := range inCh { for v := range inCh {
// Check if the command or event exists in commandOrEvent.go // Check if the command or event exists in commandOrEvent.go

View file

@ -51,7 +51,7 @@ type server struct {
// The name of the node // The name of the node
nodeName string nodeName string
// Mutex for locking when writing to the process map // Mutex for locking when writing to the process map
newMessagesCh chan []subjectAndMessage toRingbufferCh chan []subjectAndMessage
// errorKernel is doing all the error handling like what to do if // errorKernel is doing all the error handling like what to do if
// an error occurs. // an error occurs.
errorKernel *errorKernel errorKernel *errorKernel
@ -71,7 +71,7 @@ func NewServer(c *Configuration) (*server, error) {
nodeName: c.NodeName, nodeName: c.NodeName,
natsConn: conn, natsConn: conn,
processes: newProcesses(), processes: newProcesses(),
newMessagesCh: make(chan []subjectAndMessage), toRingbufferCh: make(chan []subjectAndMessage),
metrics: newMetrics(c.PromHostAndPort), metrics: newMetrics(c.PromHostAndPort),
} }
@ -101,19 +101,13 @@ func (s *server) Start() {
// Start the error kernel that will do all the error handling // Start the error kernel that will do all the error handling
// not done within a process. // not done within a process.
s.errorKernel = newErrorKernel() s.errorKernel = newErrorKernel()
s.errorKernel.startErrorKernel(s.newMessagesCh) s.errorKernel.startErrorKernel(s.toRingbufferCh)
// Start collecting the metrics // Start collecting the metrics
go s.startMetrics() go s.startMetrics()
// Start the checking the input file for new messages from operator. // Start the checking the input socket for new messages from operator.
go s.getMessagesFromFile("./", "steward.sock", s.newMessagesCh) go s.readSocket(s.toRingbufferCh)
// // if enabled, start the sayHello I'm here service at the given interval
// // REMOVED:
// if s.publisherServices.sayHelloPublisher.interval != 0 {
// go s.publisherServices.sayHelloPublisher.start(s.newMessagesCh, node(s.nodeName))
// }
// Start up the predefined subscribers. // Start up the predefined subscribers.
s.ProcessesStart() s.ProcessesStart()
@ -122,7 +116,7 @@ func (s *server) Start() {
s.printProcessesMap() s.printProcessesMap()
// Start the processing of new messages from an input channel. // Start the processing of new messages from an input channel.
s.routeMessagesToProcess("./incommmingBuffer.db", s.newMessagesCh) s.routeMessagesToProcess("./incommmingBuffer.db", s.toRingbufferCh)
select {} select {}
@ -192,7 +186,7 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage {
func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) { func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) {
// Prepare and start a new ring buffer // Prepare and start a new ring buffer
const bufferSize int = 1000 const bufferSize int = 1000
rb := newringBuffer(bufferSize, dbFileName, node(s.nodeName), s.newMessagesCh) rb := newringBuffer(bufferSize, dbFileName, node(s.nodeName), s.toRingbufferCh)
inCh := make(chan subjectAndMessage) inCh := make(chan subjectAndMessage)
ringBufferOutCh := make(chan samDBValue) ringBufferOutCh := make(chan samDBValue)
// start the ringbuffer. // start the ringbuffer.
@ -225,12 +219,12 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
// Check if the format of the message is correct. // Check if the format of the message is correct.
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er) sendErrorLogMessage(s.toRingbufferCh, node(s.nodeName), er)
continue continue
} }
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method) er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
sendErrorLogMessage(s.newMessagesCh, node(s.nodeName), er) sendErrorLogMessage(s.toRingbufferCh, node(s.nodeName), er)
continue continue
} }
@ -264,7 +258,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode) sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
proc.spawnWorker(s) proc.spawnWorker(s)

View file

@ -17,7 +17,7 @@ func (s *server) ProcessesStart() {
{ {
fmt.Printf("Starting CLICommand subscriber: %#v\n", s.nodeName) fmt.Printf("Starting CLICommand subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommand, CommandACK, s.nodeName) sub := newSubject(CLICommand, CommandACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommand.Values, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommand.Values, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
@ -28,7 +28,7 @@ func (s *server) ProcessesStart() {
{ {
fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName) fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName)
sub := newSubject(TextLogging, EventACK, s.nodeName) sub := newSubject(TextLogging, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubTextLogging.Values, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubTextLogging.Values, nil)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
@ -39,7 +39,7 @@ func (s *server) ProcessesStart() {
{ {
fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName) fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName)
sub := newSubject(SayHello, EventNACK, s.nodeName) sub := newSubject(SayHello, EventNACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubSayHello.Values, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubSayHello.Values, nil)
proc.procFuncCh = make(chan Message) proc.procFuncCh = make(chan Message)
proc.procFunc = func() error { proc.procFunc = func() error {
@ -70,7 +70,7 @@ func (s *server) ProcessesStart() {
{ {
fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName) fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName)
sub := newSubject(ErrorLog, EventNACK, "errorCentral") sub := newSubject(ErrorLog, EventNACK, "errorCentral")
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubErrorLog.Values, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubErrorLog.Values, nil)
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
} }
@ -80,7 +80,7 @@ func (s *server) ProcessesStart() {
{ {
fmt.Printf("Starting Echo Request subscriber: %#v\n", s.nodeName) fmt.Printf("Starting Echo Request subscriber: %#v\n", s.nodeName)
sub := newSubject(ECHORequest, EventACK, s.nodeName) sub := newSubject(ECHORequest, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoRequest.Values, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoRequest.Values, nil)
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
} }
@ -90,7 +90,7 @@ func (s *server) ProcessesStart() {
{ {
fmt.Printf("Starting Echo Reply subscriber: %#v\n", s.nodeName) fmt.Printf("Starting Echo Reply subscriber: %#v\n", s.nodeName)
sub := newSubject(ECHOReply, EventACK, s.nodeName) sub := newSubject(ECHOReply, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoReply.Values, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubEchoReply.Values, nil)
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
} }
@ -100,7 +100,7 @@ func (s *server) ProcessesStart() {
{ {
fmt.Printf("Starting CLICommand Request subscriber: %#v\n", s.nodeName) fmt.Printf("Starting CLICommand Request subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommandRequest, EventACK, s.nodeName) sub := newSubject(CLICommandRequest, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequest.Values, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequest.Values, nil)
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
} }
@ -110,7 +110,7 @@ func (s *server) ProcessesStart() {
{ {
fmt.Printf("Starting CLICommand NOSEQ Request subscriber: %#v\n", s.nodeName) fmt.Printf("Starting CLICommand NOSEQ Request subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommandRequestNOSEQ, EventACK, s.nodeName) sub := newSubject(CLICommandRequestNOSEQ, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequestNOSEQ.Values, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandRequestNOSEQ.Values, nil)
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
} }
@ -120,7 +120,7 @@ func (s *server) ProcessesStart() {
{ {
fmt.Printf("Starting CLICommand Reply subscriber: %#v\n", s.nodeName) fmt.Printf("Starting CLICommand Reply subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommandReply, EventACK, s.nodeName) sub := newSubject(CLICommandReply, EventACK, s.nodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandReply.Values, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommandReply.Values, nil)
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
} }
@ -135,7 +135,7 @@ func (s *server) ProcessesStart() {
fmt.Printf("Starting SayHello Publisher: %#v\n", s.nodeName) fmt.Printf("Starting SayHello Publisher: %#v\n", s.nodeName)
sub := newSubject(SayHello, EventNACK, s.configuration.CentralNodeName) sub := newSubject(SayHello, EventNACK, s.configuration.CentralNodeName)
proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil) proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil)
// Define the procFunc to be used for the process. // Define the procFunc to be used for the process.
proc.procFunc = procFunc( proc.procFunc = procFunc(
@ -157,7 +157,7 @@ func (s *server) ProcessesStart() {
// In theory the system should drop the message before it reaches here. // In theory the system should drop the message before it reaches here.
log.Printf("error: ProcessesStart: %v\n", err) log.Printf("error: ProcessesStart: %v\n", err)
} }
proc.newMessagesCh <- []subjectAndMessage{sam} proc.toRingbufferCh <- []subjectAndMessage{sam}
time.Sleep(time.Second * time.Duration(s.configuration.StartPubSayHello)) time.Sleep(time.Second * time.Duration(s.configuration.StartPubSayHello))
} }
}) })

View file

View file

@ -223,7 +223,7 @@ func (m methodCLICommand) handler(proc process, message Message, node string) ([
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("error: method timed out %v", message) er := fmt.Errorf("error: method timed out %v", message)
sendErrorLogMessage(proc.newMessagesCh, proc.node, er) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
case out = <-outCh: case out = <-outCh:
cancel() cancel()
} }
@ -334,7 +334,7 @@ func (m methodEchoRequest) handler(proc process, message Message, node string) (
// In theory the system should drop the message before it reaches here. // In theory the system should drop the message before it reaches here.
log.Printf("error: methodEchoRequest: %v\n", err) log.Printf("error: methodEchoRequest: %v\n", err)
} }
proc.newMessagesCh <- []subjectAndMessage{nSAM} proc.toRingbufferCh <- []subjectAndMessage{nSAM}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil return ackMsg, nil
@ -398,7 +398,7 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("error: method timed out %v", message) er := fmt.Errorf("error: method timed out %v", message)
sendErrorLogMessage(proc.newMessagesCh, proc.node, er) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
case out := <-outCh: case out := <-outCh:
cancel() cancel()
@ -418,7 +418,7 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str
// In theory the system should drop the message before it reaches here. // In theory the system should drop the message before it reaches here.
log.Printf("error: methodCLICommandRequest: %v\n", err) log.Printf("error: methodCLICommandRequest: %v\n", err)
} }
proc.newMessagesCh <- []subjectAndMessage{nSAM} proc.toRingbufferCh <- []subjectAndMessage{nSAM}
} }
}() }()
@ -471,7 +471,7 @@ func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, nod
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("error: method timed out %v", message) er := fmt.Errorf("error: method timed out %v", message)
sendErrorLogMessage(proc.newMessagesCh, proc.node, er) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
case out := <-outCh: case out := <-outCh:
cancel() cancel()
@ -491,7 +491,7 @@ func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, nod
// In theory the system should drop the message before it reaches here. // In theory the system should drop the message before it reaches here.
log.Printf("error: methodCLICommandRequest: %v\n", err) log.Printf("error: methodCLICommandRequest: %v\n", err)
} }
proc.newMessagesCh <- []subjectAndMessage{nSAM} proc.toRingbufferCh <- []subjectAndMessage{nSAM}
} }
}() }()