mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
exported type node to Node
This commit is contained in:
parent
e333e6fa54
commit
923fd4c611
10 changed files with 86 additions and 84 deletions
|
@ -3,7 +3,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
"github.com/RaaLabs/steward"
|
steward "github.com/RaaLabs/steward/stew"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
|
@ -24,7 +24,7 @@ import (
|
||||||
type flagNodeSlice struct {
|
type flagNodeSlice struct {
|
||||||
value string
|
value string
|
||||||
OK bool
|
OK bool
|
||||||
Values []node
|
Values []Node
|
||||||
}
|
}
|
||||||
|
|
||||||
// String method
|
// String method
|
||||||
|
@ -60,17 +60,17 @@ func (f *flagNodeSlice) Parse() error {
|
||||||
if split[0] == "RST" {
|
if split[0] == "RST" {
|
||||||
f.OK = false
|
f.OK = false
|
||||||
f.value = ""
|
f.value = ""
|
||||||
f.Values = []node{}
|
f.Values = []Node{}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
fv := f.value
|
fv := f.value
|
||||||
sp := strings.Split(fv, ",")
|
sp := strings.Split(fv, ",")
|
||||||
f.OK = true
|
f.OK = true
|
||||||
f.Values = []node{}
|
f.Values = []Node{}
|
||||||
|
|
||||||
for _, v := range sp {
|
for _, v := range sp {
|
||||||
f.Values = append(f.Values, node(v))
|
f.Values = append(f.Values, Node(v))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -155,17 +155,17 @@ func newConfigurationDefaults() Configuration {
|
||||||
CentralNodeName: "",
|
CentralNodeName: "",
|
||||||
RootCAPath: "",
|
RootCAPath: "",
|
||||||
NkeySeedFile: "",
|
NkeySeedFile: "",
|
||||||
StartSubREQErrorLog: flagNodeSlice{Values: []node{}},
|
StartSubREQErrorLog: flagNodeSlice{Values: []Node{}},
|
||||||
StartSubREQHello: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQToFile: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQPing: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQPing: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQPong: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQPong: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQToConsole: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQTailFile: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
}
|
}
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
// The node to send the message to
|
// The node to send the message to
|
||||||
ToNode node `json:"toNode" yaml:"toNode"`
|
ToNode Node `json:"toNode" yaml:"toNode"`
|
||||||
// The Unique ID of the message
|
// The Unique ID of the message
|
||||||
ID int `json:"id" yaml:"id"`
|
ID int `json:"id" yaml:"id"`
|
||||||
// The actual data in the message
|
// The actual data in the message
|
||||||
|
@ -25,7 +25,7 @@ type Message struct {
|
||||||
// you can override it setting your own here.
|
// you can override it setting your own here.
|
||||||
ReplyMethod Method `json:"replyMethod" yaml:"replyMethod"`
|
ReplyMethod Method `json:"replyMethod" yaml:"replyMethod"`
|
||||||
// From what node the message originated
|
// From what node the message originated
|
||||||
FromNode node
|
FromNode Node
|
||||||
// ACKTimeout for waiting for an ack message
|
// ACKTimeout for waiting for an ack message
|
||||||
ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"`
|
ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"`
|
||||||
// Resend retries
|
// Resend retries
|
||||||
|
@ -83,7 +83,7 @@ func gobEncodeMessage(m Message) ([]byte, error) {
|
||||||
|
|
||||||
// --- Subject
|
// --- Subject
|
||||||
|
|
||||||
type node string
|
type Node string
|
||||||
|
|
||||||
// subject contains the representation of a subject to be used with one
|
// subject contains the representation of a subject to be used with one
|
||||||
// specific process
|
// specific process
|
||||||
|
@ -107,7 +107,7 @@ type Subject struct {
|
||||||
func newSubject(method Method, node string) Subject {
|
func newSubject(method Method, node string) Subject {
|
||||||
// Get the CommandOrEvent type for the Method.
|
// Get the CommandOrEvent type for the Method.
|
||||||
ma := method.GetMethodsAvailable()
|
ma := method.GetMethodsAvailable()
|
||||||
coe, ok := ma.methodhandlers[method]
|
coe, ok := ma.Methodhandlers[method]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Printf("error: no CommandOrEvent type specified for the method: %v\n", method)
|
log.Printf("error: no CommandOrEvent type specified for the method: %v\n", method)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
|
34
process.go
34
process.go
|
@ -31,7 +31,7 @@ type process struct {
|
||||||
subject Subject
|
subject Subject
|
||||||
// Put a node here to be able know the node a process is at.
|
// Put a node here to be able know the node a process is at.
|
||||||
// NB: Might not be needed later on.
|
// NB: Might not be needed later on.
|
||||||
node node
|
node Node
|
||||||
// The processID for the current process
|
// The processID for the current process
|
||||||
processID int
|
processID int
|
||||||
// errorCh is the same channel the errorKernel uses to
|
// errorCh is the same channel the errorKernel uses to
|
||||||
|
@ -42,7 +42,7 @@ type process struct {
|
||||||
errorCh chan errProcess
|
errorCh chan errProcess
|
||||||
processKind processKind
|
processKind processKind
|
||||||
// Who are we allowed to receive from ?
|
// Who are we allowed to receive from ?
|
||||||
allowedReceivers map[node]struct{}
|
allowedReceivers map[Node]struct{}
|
||||||
// methodsAvailable
|
// methodsAvailable
|
||||||
methodsAvailable MethodsAvailable
|
methodsAvailable MethodsAvailable
|
||||||
// Helper or service function that can do some kind of work
|
// Helper or service function that can do some kind of work
|
||||||
|
@ -78,12 +78,12 @@ type process struct {
|
||||||
|
|
||||||
// prepareNewProcess will set the the provided values and the default
|
// prepareNewProcess will set the the provided values and the default
|
||||||
// values for a process.
|
// values for a process.
|
||||||
func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process {
|
func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []Node, procFunc func() error) process {
|
||||||
// create the initial configuration for a sessions communicating with 1 host process.
|
// create the initial configuration for a sessions communicating with 1 host process.
|
||||||
processes.lastProcessID++
|
processes.lastProcessID++
|
||||||
|
|
||||||
// make the slice of allowedReceivers into a map value for easy lookup.
|
// make the slice of allowedReceivers into a map value for easy lookup.
|
||||||
m := make(map[node]struct{})
|
m := make(map[Node]struct{})
|
||||||
for _, a := range allowedReceivers {
|
for _, a := range allowedReceivers {
|
||||||
m[a] = struct{}{}
|
m[a] = struct{}{}
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,7 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<-
|
||||||
proc := process{
|
proc := process{
|
||||||
messageID: 0,
|
messageID: 0,
|
||||||
subject: subject,
|
subject: subject,
|
||||||
node: node(configuration.NodeName),
|
node: Node(configuration.NodeName),
|
||||||
processID: processes.lastProcessID,
|
processID: processes.lastProcessID,
|
||||||
errorCh: errCh,
|
errorCh: errCh,
|
||||||
processKind: processKind,
|
processKind: processKind,
|
||||||
|
@ -158,7 +158,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
err := p.procFunc(p.ctx)
|
err := p.procFunc(p.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -178,7 +178,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
err := p.procFunc(p.ctx)
|
err := p.procFunc(p.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -211,7 +211,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
||||||
dataPayload, err := gobEncodeMessage(message)
|
dataPayload, err := gobEncodeMessage(message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: createDataPayload: %v", err)
|
er := fmt.Errorf("error: createDataPayload: %v", err)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -305,7 +305,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
err := gobDec.Decode(&message)
|
err := gobDec.Decode(&message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: gob decoding failed: %v", err)
|
er := fmt.Errorf("error: gob decoding failed: %v", err)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
|
@ -313,7 +313,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||||
if !ok {
|
if !ok {
|
||||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
out := []byte("not allowed from " + message.FromNode)
|
out := []byte("not allowed from " + message.FromNode)
|
||||||
|
@ -332,11 +332,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a confirmation message back to the publisher
|
// Send a confirmation message back to the publisher
|
||||||
|
@ -346,7 +346,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||||
if !ok {
|
if !ok {
|
||||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we are allowed to receive from that host
|
// Check if we are allowed to receive from that host
|
||||||
|
@ -366,16 +366,16 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
|
||||||
}
|
}
|
||||||
// ---
|
// ---
|
||||||
default:
|
default:
|
||||||
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
|
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(thisNode), er)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -413,7 +413,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
|
||||||
case m = <-p.subject.messageCh:
|
case m = <-p.subject.messageCh:
|
||||||
case <-p.ctx.Done():
|
case <-p.ctx.Done():
|
||||||
er := fmt.Errorf("info: canceling publisher: %v", p.subject.name())
|
er := fmt.Errorf("info: canceling publisher: %v", p.subject.name())
|
||||||
sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Get the process name so we can look up the process in the
|
// Get the process name so we can look up the process in the
|
||||||
|
|
|
@ -17,14 +17,14 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
|
||||||
conn, err := s.netListener.Accept()
|
conn, err := s.netListener.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
||||||
sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er)
|
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
b := make([]byte, 65535)
|
b := make([]byte, 65535)
|
||||||
_, err = conn.Read(b)
|
_, err = conn.Read(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: failed to read data from socket: %v", err)
|
er := fmt.Errorf("error: failed to read data from socket: %v", err)
|
||||||
sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er)
|
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
|
||||||
sam, err := convertBytesToSAM(b)
|
sam, err := convertBytesToSAM(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: malformed json: %v", err)
|
er := fmt.Errorf("error: malformed json: %v", err)
|
||||||
sendErrorLogMessage(toRingbufferCh, node(s.nodeName), er)
|
sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,7 +42,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) {
|
||||||
|
|
||||||
// Fill in the value for the FromNode field, so the receiver
|
// Fill in the value for the FromNode field, so the receiver
|
||||||
// can check this field to know where it came from.
|
// can check this field to know where it came from.
|
||||||
sam[i].Message.FromNode = node(s.nodeName)
|
sam[i].Message.FromNode = Node(s.nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
|
|
|
@ -38,12 +38,12 @@ type ringBuffer struct {
|
||||||
totalMessagesIndex int
|
totalMessagesIndex int
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
permStore chan string
|
permStore chan string
|
||||||
nodeName node
|
nodeName Node
|
||||||
newMessagesCh chan []subjectAndMessage
|
newMessagesCh chan []subjectAndMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
// newringBuffer is a push/pop storage for values.
|
// newringBuffer is a push/pop storage for values.
|
||||||
func newringBuffer(c Configuration, size int, dbFileName string, nodeName node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
|
func newringBuffer(c Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
|
||||||
// ---
|
// ---
|
||||||
// Check if socket folder exists, if not create it
|
// Check if socket folder exists, if not create it
|
||||||
if _, err := os.Stat(c.DatabaseFolder); os.IsNotExist(err) {
|
if _, err := os.Stat(c.DatabaseFolder); os.IsNotExist(err) {
|
||||||
|
@ -134,7 +134,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
||||||
// Check if the command or event exists in commandOrEvent.go
|
// Check if the command or event exists in commandOrEvent.go
|
||||||
if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) {
|
if !coeAvailable.CheckIfExists(v.CommandOrEvent, v.Subject) {
|
||||||
er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject)
|
er := fmt.Errorf("error: fillBuffer: the event or command type do not exist, so this message will not be put on the buffer to be processed. Check the syntax used in the json file for the message. Allowed values are : %v, where given: coe=%v, with subject=%v", coeAvailableValues, v.CommandOrEvent, v.Subject)
|
||||||
sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er)
|
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er)
|
||||||
|
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
// if it was not a valid value, we jump back up, and
|
// if it was not a valid value, we jump back up, and
|
||||||
|
@ -167,14 +167,14 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
||||||
js, err := json.Marshal(samV)
|
js, err := json.Marshal(samV)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error:fillBuffer gob encoding samValue: %v", err)
|
er := fmt.Errorf("error:fillBuffer gob encoding samValue: %v", err)
|
||||||
sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er)
|
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the incomming message in key/value store
|
// Store the incomming message in key/value store
|
||||||
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js)
|
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
|
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
|
||||||
sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er)
|
sendErrorLogMessage(r.newMessagesCh, Node(r.nodeName), er)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
12
server.go
12
server.go
|
@ -203,7 +203,7 @@ func (s *server) Start() {
|
||||||
// processes are tied to the process struct, we need to create an
|
// processes are tied to the process struct, we need to create an
|
||||||
// initial process to start the rest.
|
// initial process to start the rest.
|
||||||
sub := newSubject(REQInitial, s.nodeName)
|
sub := newSubject(REQInitial, s.nodeName)
|
||||||
p := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []node{}, nil)
|
p := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, "", []Node{}, nil)
|
||||||
p.ProcessesStart()
|
p.ProcessesStart()
|
||||||
|
|
||||||
time.Sleep(time.Second * 1)
|
time.Sleep(time.Second * 1)
|
||||||
|
@ -234,7 +234,7 @@ func (p *processes) printProcessesMap() {
|
||||||
|
|
||||||
// sendErrorMessage will put the error message directly on the channel that is
|
// sendErrorMessage will put the error message directly on the channel that is
|
||||||
// read by the nats publishing functions.
|
// read by the nats publishing functions.
|
||||||
func sendErrorLogMessage(newMessagesCh chan<- []subjectAndMessage, FromNode node, theError error) {
|
func sendErrorLogMessage(newMessagesCh chan<- []subjectAndMessage, FromNode Node, theError error) {
|
||||||
// NB: Adding log statement here for more visuality during development.
|
// NB: Adding log statement here for more visuality during development.
|
||||||
log.Printf("%v\n", theError)
|
log.Printf("%v\n", theError)
|
||||||
sam := createErrorMsgContent(FromNode, theError)
|
sam := createErrorMsgContent(FromNode, theError)
|
||||||
|
@ -243,7 +243,7 @@ func sendErrorLogMessage(newMessagesCh chan<- []subjectAndMessage, FromNode node
|
||||||
|
|
||||||
// createErrorMsgContent will prepare a subject and message with the content
|
// createErrorMsgContent will prepare a subject and message with the content
|
||||||
// of the error
|
// of the error
|
||||||
func createErrorMsgContent(FromNode node, theError error) subjectAndMessage {
|
func createErrorMsgContent(FromNode Node, theError error) subjectAndMessage {
|
||||||
// Add time stamp
|
// Add time stamp
|
||||||
er := fmt.Sprintf("%v, %v\n", time.Now().UTC(), theError.Error())
|
er := fmt.Sprintf("%v, %v\n", time.Now().UTC(), theError.Error())
|
||||||
|
|
||||||
|
@ -275,7 +275,7 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage {
|
||||||
func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) {
|
func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) {
|
||||||
// Prepare and start a new ring buffer
|
// Prepare and start a new ring buffer
|
||||||
const bufferSize int = 1000
|
const bufferSize int = 1000
|
||||||
rb := newringBuffer(*s.configuration, bufferSize, dbFileName, node(s.nodeName), s.toRingbufferCh)
|
rb := newringBuffer(*s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingbufferCh)
|
||||||
inCh := make(chan subjectAndMessage)
|
inCh := make(chan subjectAndMessage)
|
||||||
ringBufferOutCh := make(chan samDBValue)
|
ringBufferOutCh := make(chan samDBValue)
|
||||||
// start the ringbuffer.
|
// start the ringbuffer.
|
||||||
|
@ -308,12 +308,12 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
|
||||||
// Check if the format of the message is correct.
|
// Check if the format of the message is correct.
|
||||||
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
||||||
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
||||||
sendErrorLogMessage(s.toRingbufferCh, node(s.nodeName), er)
|
sendErrorLogMessage(s.toRingbufferCh, Node(s.nodeName), er)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
|
if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) {
|
||||||
er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
|
er := fmt.Errorf("error: routeMessagesToProcess: the command or event do not exist, message dropped: %v", sam.Message.Method)
|
||||||
sendErrorLogMessage(s.toRingbufferCh, node(s.nodeName), er)
|
sendErrorLogMessage(s.toRingbufferCh, Node(s.nodeName), er)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ func (p process) ProcessesStart() {
|
||||||
{
|
{
|
||||||
log.Printf("Starting REQOpCommand subscriber: %#v\n", p.node)
|
log.Printf("Starting REQOpCommand subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQOpCommand, string(p.node))
|
sub := newSubject(REQOpCommand, string(p.node))
|
||||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []node{node(p.configuration.CentralNodeName)}, nil)
|
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, []Node{Node(p.configuration.CentralNodeName)}, nil)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ func (s startup) pubREQHello(p process) {
|
||||||
log.Printf("Starting Hello Publisher: %#v\n", p.node)
|
log.Printf("Starting Hello Publisher: %#v\n", p.node)
|
||||||
|
|
||||||
sub := newSubject(REQHello, p.configuration.CentralNodeName)
|
sub := newSubject(REQHello, p.configuration.CentralNodeName)
|
||||||
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []node{}, nil)
|
proc := newProcess(p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, []Node{}, nil)
|
||||||
|
|
||||||
// Define the procFunc to be used for the process.
|
// Define the procFunc to be used for the process.
|
||||||
proc.procFunc = procFunc(
|
proc.procFunc = procFunc(
|
||||||
|
@ -111,8 +111,8 @@ func (s startup) pubREQHello(p process) {
|
||||||
d := fmt.Sprintf("Hello from %v\n", p.node)
|
d := fmt.Sprintf("Hello from %v\n", p.node)
|
||||||
|
|
||||||
m := Message{
|
m := Message{
|
||||||
ToNode: node(p.configuration.CentralNodeName),
|
ToNode: Node(p.configuration.CentralNodeName),
|
||||||
FromNode: node(p.node),
|
FromNode: Node(p.node),
|
||||||
Data: []string{d},
|
Data: []string{d},
|
||||||
Method: REQHello,
|
Method: REQHello,
|
||||||
}
|
}
|
||||||
|
@ -189,7 +189,7 @@ func (s startup) subREQHello(p process) {
|
||||||
// of the nodes we've received hello's from in the sayHelloNodes map,
|
// of the nodes we've received hello's from in the sayHelloNodes map,
|
||||||
// which is the information we pass along to generate metrics.
|
// which is the information we pass along to generate metrics.
|
||||||
proc.procFunc = func(ctx context.Context) error {
|
proc.procFunc = func(ctx context.Context) error {
|
||||||
sayHelloNodes := make(map[node]struct{})
|
sayHelloNodes := make(map[Node]struct{})
|
||||||
|
|
||||||
promHelloNodes := promauto.NewGauge(prometheus.GaugeOpts{
|
promHelloNodes := promauto.NewGauge(prometheus.GaugeOpts{
|
||||||
Name: "hello_nodes_total",
|
Name: "hello_nodes_total",
|
||||||
|
|
|
@ -14,6 +14,8 @@ import (
|
||||||
|
|
||||||
"github.com/gdamore/tcell/v2"
|
"github.com/gdamore/tcell/v2"
|
||||||
"github.com/rivo/tview"
|
"github.com/rivo/tview"
|
||||||
|
|
||||||
|
"github.com/RaaLabs/steward"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Stew struct {
|
type Stew struct {
|
||||||
|
@ -125,7 +127,7 @@ func (p *pageMessage) start() error {
|
||||||
// template, and we can add case statements for those fields below
|
// template, and we can add case statements for those fields below
|
||||||
// that we do not wan't to check.
|
// that we do not wan't to check.
|
||||||
func compareMsgAndMessage() error {
|
func compareMsgAndMessage() error {
|
||||||
stewardMessage := Message{}
|
stewardMessage := steward.Message{}
|
||||||
stewMsg := msg{}
|
stewMsg := msg{}
|
||||||
|
|
||||||
stewardRefVal := reflect.ValueOf(stewardMessage)
|
stewardRefVal := reflect.ValueOf(stewardMessage)
|
||||||
|
@ -206,16 +208,16 @@ func (p *pageMessage) drawMsgForm() error {
|
||||||
value := `"bash","-c","..."`
|
value := `"bash","-c","..."`
|
||||||
p.msgInputForm.AddInputField(fieldName, value, 30, nil, nil)
|
p.msgInputForm.AddInputField(fieldName, value, 30, nil, nil)
|
||||||
case "Method":
|
case "Method":
|
||||||
var m Method
|
var m steward.Method
|
||||||
ma := m.GetMethodsAvailable()
|
ma := m.GetMethodsAvailable()
|
||||||
values := []string{}
|
values := []string{}
|
||||||
for k := range ma.methodhandlers {
|
for k := range ma.Methodhandlers {
|
||||||
values = append(values, string(k))
|
values = append(values, string(k))
|
||||||
}
|
}
|
||||||
p.msgInputForm.AddDropDown(fieldName, values, 0, nil).SetItemPadding(1)
|
p.msgInputForm.AddDropDown(fieldName, values, 0, nil).SetItemPadding(1)
|
||||||
case "ReplyMethod":
|
case "ReplyMethod":
|
||||||
var m Method
|
var m steward.Method
|
||||||
rm := m.getReplyMethods()
|
rm := m.GetReplyMethods()
|
||||||
values := []string{}
|
values := []string{}
|
||||||
for _, k := range rm {
|
for _, k := range rm {
|
||||||
values = append(values, string(k))
|
values = append(values, string(k))
|
||||||
|
@ -270,10 +272,10 @@ func (p *pageMessage) drawMsgForm() error {
|
||||||
formItems := []formItem{}
|
formItems := []formItem{}
|
||||||
|
|
||||||
// Get values values to be used for the "Method" dropdown.
|
// Get values values to be used for the "Method" dropdown.
|
||||||
var m Method
|
var m steward.Method
|
||||||
ma := m.GetMethodsAvailable()
|
ma := m.GetMethodsAvailable()
|
||||||
values := []string{}
|
values := []string{}
|
||||||
for k := range ma.methodhandlers {
|
for k := range ma.Methodhandlers {
|
||||||
values = append(values, string(k))
|
values = append(values, string(k))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,10 +321,10 @@ func (p *pageMessage) drawMsgForm() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get values values to be used for the "Method" dropdown.
|
// Get values values to be used for the "Method" dropdown.
|
||||||
var m Method
|
var m steward.Method
|
||||||
ma := m.GetMethodsAvailable()
|
ma := m.GetMethodsAvailable()
|
||||||
values := []string{}
|
values := []string{}
|
||||||
for k := range ma.methodhandlers {
|
for k := range ma.Methodhandlers {
|
||||||
values = append(values, string(k))
|
values = append(values, string(k))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -406,8 +408,8 @@ func (p *pageMessage) drawMsgForm() error {
|
||||||
// TODO: Should also add a write directly to socket here.
|
// TODO: Should also add a write directly to socket here.
|
||||||
AddButton("generate to console", func() {
|
AddButton("generate to console", func() {
|
||||||
// ---
|
// ---
|
||||||
opCmdStartProc := OpCmdStartProc{}
|
opCmdStartProc := steward.OpCmdStartProc{}
|
||||||
opCmdStopProc := OpCmdStopProc{}
|
opCmdStopProc := steward.OpCmdStopProc{}
|
||||||
// ---
|
// ---
|
||||||
|
|
||||||
// fh, err := os.Create("message.json")
|
// fh, err := os.Create("message.json")
|
||||||
|
@ -432,7 +434,7 @@ func (p *pageMessage) drawMsgForm() error {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
m.ToNode = node(value)
|
m.ToNode = steward.Node(value)
|
||||||
case "Data":
|
case "Data":
|
||||||
// Split the comma separated string into a
|
// Split the comma separated string into a
|
||||||
// and remove the start and end ampersand.
|
// and remove the start and end ampersand.
|
||||||
|
@ -457,9 +459,9 @@ func (p *pageMessage) drawMsgForm() error {
|
||||||
|
|
||||||
m.Data = data
|
m.Data = data
|
||||||
case "Method":
|
case "Method":
|
||||||
m.Method = Method(value)
|
m.Method = steward.Method(value)
|
||||||
case "ReplyMethod":
|
case "ReplyMethod":
|
||||||
m.ReplyMethod = Method(value)
|
m.ReplyMethod = steward.Method(value)
|
||||||
case "ACKTimeout":
|
case "ACKTimeout":
|
||||||
v, _ := strconv.Atoi(value)
|
v, _ := strconv.Atoi(value)
|
||||||
m.ACKTimeout = v
|
m.ACKTimeout = v
|
||||||
|
@ -497,13 +499,13 @@ func (p *pageMessage) drawMsgForm() error {
|
||||||
fmt.Fprintf(p.logForm, "%v : error: missing startProc Method\n", time.Now().Format("Mon Jan _2 15:04:05 2006"))
|
fmt.Fprintf(p.logForm, "%v : error: missing startProc Method\n", time.Now().Format("Mon Jan _2 15:04:05 2006"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
opCmdStartProc.Method = Method(value)
|
opCmdStartProc.Method = steward.Method(value)
|
||||||
case "startProc AllowedNodes":
|
case "startProc AllowedNodes":
|
||||||
// Split the comma separated string into a
|
// Split the comma separated string into a
|
||||||
// and remove the start and end ampersand.
|
// and remove the start and end ampersand.
|
||||||
sp := strings.Split(value, ",")
|
sp := strings.Split(value, ",")
|
||||||
|
|
||||||
var allowedNodes []node
|
var allowedNodes []steward.Node
|
||||||
|
|
||||||
for _, v := range sp {
|
for _, v := range sp {
|
||||||
// Check if format is correct, return if not.
|
// Check if format is correct, return if not.
|
||||||
|
@ -517,7 +519,7 @@ func (p *pageMessage) drawMsgForm() error {
|
||||||
v = v[1:]
|
v = v[1:]
|
||||||
v = strings.TrimSuffix(v, "\"")
|
v = strings.TrimSuffix(v, "\"")
|
||||||
|
|
||||||
allowedNodes = append(allowedNodes, node(v))
|
allowedNodes = append(allowedNodes, steward.Node(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
opCmdStartProc.AllowedNodes = allowedNodes
|
opCmdStartProc.AllowedNodes = allowedNodes
|
||||||
|
@ -604,15 +606,15 @@ func getNodeNames(filePath string) ([]string, error) {
|
||||||
// empty fields.
|
// empty fields.
|
||||||
type msg struct {
|
type msg struct {
|
||||||
// The node to send the message to
|
// The node to send the message to
|
||||||
ToNode node `json:"toNode" yaml:"toNode"`
|
ToNode steward.Node `json:"toNode" yaml:"toNode"`
|
||||||
// The actual data in the message
|
// The actual data in the message
|
||||||
Data []string `json:"data" yaml:"data"`
|
Data []string `json:"data" yaml:"data"`
|
||||||
// Method, what is this message doing, etc. CLI, syslog, etc.
|
// Method, what is this message doing, etc. CLI, syslog, etc.
|
||||||
Method Method `json:"method" yaml:"method"`
|
Method steward.Method `json:"method" yaml:"method"`
|
||||||
// ReplyMethod, is the method to use for the reply message.
|
// ReplyMethod, is the method to use for the reply message.
|
||||||
// By default the reply method will be set to log to file, but
|
// By default the reply method will be set to log to file, but
|
||||||
// you can override it setting your own here.
|
// you can override it setting your own here.
|
||||||
ReplyMethod Method `json:"replyMethod" yaml:"replyMethod"`
|
ReplyMethod steward.Method `json:"replyMethod" yaml:"replyMethod"`
|
||||||
// From what node the message originated
|
// From what node the message originated
|
||||||
ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"`
|
ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"`
|
||||||
// Resend retries
|
// Resend retries
|
|
@ -136,7 +136,7 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
||||||
// Command, Used to make a request to perform an action
|
// Command, Used to make a request to perform an action
|
||||||
// Event, Used to communicate that an action has been performed.
|
// Event, Used to communicate that an action has been performed.
|
||||||
ma := MethodsAvailable{
|
ma := MethodsAvailable{
|
||||||
methodhandlers: map[Method]methodHandler{
|
Methodhandlers: map[Method]methodHandler{
|
||||||
REQInitial: methodREQInitial{
|
REQInitial: methodREQInitial{
|
||||||
commandOrEvent: CommandACK,
|
commandOrEvent: CommandACK,
|
||||||
},
|
},
|
||||||
|
@ -186,7 +186,7 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reply methods
|
// Reply methods
|
||||||
func (m Method) getReplyMethods() []Method {
|
func (m Method) GetReplyMethods() []Method {
|
||||||
rm := []Method{REQToConsole, REQToFile, REQToFileAppend}
|
rm := []Method{REQToConsole, REQToFile, REQToFileAppend}
|
||||||
return rm
|
return rm
|
||||||
}
|
}
|
||||||
|
@ -196,7 +196,7 @@ func (m Method) getReplyMethods() []Method {
|
||||||
// as input argument.
|
// as input argument.
|
||||||
func (m Method) getHandler(method Method) methodHandler {
|
func (m Method) getHandler(method Method) methodHandler {
|
||||||
ma := m.GetMethodsAvailable()
|
ma := m.GetMethodsAvailable()
|
||||||
mh := ma.methodhandlers[method]
|
mh := ma.Methodhandlers[method]
|
||||||
|
|
||||||
return mh
|
return mh
|
||||||
}
|
}
|
||||||
|
@ -223,14 +223,14 @@ func (m methodREQInitial) handler(proc process, message Message, node string) ([
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
type MethodsAvailable struct {
|
type MethodsAvailable struct {
|
||||||
methodhandlers map[Method]methodHandler
|
Methodhandlers map[Method]methodHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if exists will check if the Method is defined. If true the bool
|
// Check if exists will check if the Method is defined. If true the bool
|
||||||
// value will be set to true, and the methodHandler function for that type
|
// value will be set to true, and the methodHandler function for that type
|
||||||
// will be returned.
|
// will be returned.
|
||||||
func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
|
func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
|
||||||
mFunc, ok := ma.methodhandlers[m]
|
mFunc, ok := ma.Methodhandlers[m]
|
||||||
if ok {
|
if ok {
|
||||||
// fmt.Printf("******THE TOPIC EXISTS: %v******\n", m)
|
// fmt.Printf("******THE TOPIC EXISTS: %v******\n", m)
|
||||||
return mFunc, true
|
return mFunc, true
|
||||||
|
@ -261,11 +261,11 @@ func (m methodREQOpCommand) getKind() CommandOrEvent {
|
||||||
|
|
||||||
type OpCmdStartProc struct {
|
type OpCmdStartProc struct {
|
||||||
Method Method `json:"method"`
|
Method Method `json:"method"`
|
||||||
AllowedNodes []node `json:"allowedNodes"`
|
AllowedNodes []Node `json:"allowedNodes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type OpCmdStopProc struct {
|
type OpCmdStopProc struct {
|
||||||
RecevingNode node `json:"receivingNode"`
|
RecevingNode Node `json:"receivingNode"`
|
||||||
Method Method `json:"method"`
|
Method Method `json:"method"`
|
||||||
Kind processKind `json:"kind"`
|
Kind processKind `json:"kind"`
|
||||||
ID int `json:"id"`
|
ID int `json:"id"`
|
||||||
|
|
Loading…
Reference in a new issue