1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

fixed procFuncs

This commit is contained in:
postmannen 2021-03-05 13:10:46 +01:00
parent abf8c0d3c7
commit ae0e080f80
8 changed files with 58 additions and 28 deletions

View file

@ -110,6 +110,8 @@ and for a shell command of type command to a host named "ship2"
## TODO ## TODO
- FIX so it can handle multiple slices of input for inmsg.txt
- Add config file options to use when starting up the program for options. - Add config file options to use when starting up the program for options.
- Rename CLICommand to cli - Rename CLICommand to cli

View file

@ -0,0 +1,18 @@
[
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":5,
"retries":10
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":5,
"retries":10
}
]

Binary file not shown.

View file

@ -115,7 +115,7 @@ func (p process) spawnWorker(s *server) {
if p.processKind == processKindSubscriber { if p.processKind == processKindSubscriber {
// If there is a procFunc for the process, start it. // If there is a procFunc for the process, start it.
if p.procFunc != nil { if p.procFunc != nil {
p.procFuncCh = make(chan Message) // REMOVED: p.procFuncCh = make(chan Message)
// Start the procFunc in it's own anonymous func so we are able // Start the procFunc in it's own anonymous func so we are able
// to get the return error. // to get the return error.
go func() { go func() {
@ -125,6 +125,8 @@ func (p process) spawnWorker(s *server) {
} }
}() }()
} }
//fmt.Printf("-- DEBUG 1.1: %#v, %#v, %#v\n\n", p.subject.name(), p.procFunc, p.procFuncCh)
p.subscribeMessages(s) p.subscribeMessages(s)
} }
} }
@ -283,6 +285,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
// //
// since we don't send a reply for a NACK message, we don't care about the // since we don't send a reply for a NACK message, we don't care about the
// out return when calling mf.handler // out return when calling mf.handler
//fmt.Printf("-- DEBUG 2.2.1: %#v\n\n", p.subject)
_, err := mf.handler(s, p, message, thisNode) _, err := mf.handler(s, p, message, thisNode)
if err != nil { if err != nil {
@ -297,11 +300,13 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
// Subscribe will start up a Go routine under the hood calling the // Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received. // callback function specified when a new message is received.
func (p process) subscribeMessages(s *server) { func (p process) subscribeMessages(s *server) {
//fmt.Printf("-- DEBUG 2.1: %#v, %#v, %#v\n\n", p.subject.name(), p.procFunc, p.procFuncCh)
subject := string(p.subject.name()) subject := string(p.subject.name())
_, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// We start one handler per message received by using go routines here. // We start one handler per message received by using go routines here.
// This is for being able to reply back the current publisher who sent // This is for being able to reply back the current publisher who sent
// the message. // the message.
//fmt.Printf("-- DEBUG 2.2: %#v, %#v, %#v\n\n", p.subject.name(), p.procFunc, p.procFuncCh)
go p.subscriberHandler(s.natsConn, s.nodeName, msg, s) go p.subscriberHandler(s.natsConn, s.nodeName, msg, s)
}) })
if err != nil { if err != nil {

View file

@ -42,12 +42,12 @@ func (s *sayHelloPublisher) createMsg(FromNode node) subjectAndMessage {
sam := subjectAndMessage{ sam := subjectAndMessage{
Subject: Subject{ Subject: Subject{
ToNode: "errorCentral", ToNode: "central",
CommandOrEvent: EventNACK, CommandOrEvent: EventNACK,
Method: ErrorLog, Method: SayHello,
}, },
Message: Message{ Message: Message{
ToNode: "errorCentral", ToNode: "central",
FromNode: FromNode, FromNode: FromNode,
Data: []string{m}, Data: []string{m},
Method: SayHello, Method: SayHello,

View file

@ -263,7 +263,7 @@ func (r *ringBuffer) printBucketContent(bucket string) error {
err := r.db.View(func(tx *bolt.Tx) error { err := r.db.View(func(tx *bolt.Tx) error {
bu := tx.Bucket([]byte(bucket)) bu := tx.Bucket([]byte(bucket))
fmt.Println("--------------------------K/V STORE DUMP---------------------------") fmt.Println("-- K/V STORE DUMP--")
bu.ForEach(func(k, v []byte) error { bu.ForEach(func(k, v []byte) error {
var vv samDBValue var vv samDBValue
err := json.Unmarshal(v, &vv) err := json.Unmarshal(v, &vv)
@ -273,7 +273,7 @@ func (r *ringBuffer) printBucketContent(bucket string) error {
fmt.Printf("k: %s, v: %v\n", k, vv) fmt.Printf("k: %s, v: %v\n", k, vv)
return nil return nil
}) })
fmt.Println("-------------------------------------------------------------------") fmt.Println("--")
return nil return nil
}) })

View file

@ -68,7 +68,7 @@ type Method string
// - EventNack // - EventNack
func (m Method) GetMethodsAvailable() MethodsAvailable { func (m Method) GetMethodsAvailable() MethodsAvailable {
ma := MethodsAvailable{ ma := MethodsAvailable{
topics: map[Method]methodHandler{ methodhandlers: map[Method]methodHandler{
CLICommand: methodCommandCLICommand{ CLICommand: methodCommandCLICommand{
commandOrEvent: CommandACK, commandOrEvent: CommandACK,
}, },
@ -92,20 +92,20 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
// as input argument. // as input argument.
func (m Method) getHandler(method Method) methodHandler { func (m Method) getHandler(method Method) methodHandler {
ma := m.GetMethodsAvailable() ma := m.GetMethodsAvailable()
mh := ma.topics[method] mh := ma.methodhandlers[method]
return mh return mh
} }
type MethodsAvailable struct { type MethodsAvailable struct {
topics map[Method]methodHandler methodhandlers map[Method]methodHandler
} }
// Check if exists will check if the Method is defined. If true the bool // Check if exists will check if the Method is defined. If true the bool
// value will be set to true, and the methodHandler function for that type // value will be set to true, and the methodHandler function for that type
// will be returned. // will be returned.
func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) { func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
mFunc, ok := ma.topics[m] mFunc, ok := ma.methodhandlers[m]
if ok { if ok {
// fmt.Printf("******THE TOPIC EXISTS: %v******\n", m) // fmt.Printf("******THE TOPIC EXISTS: %v******\n", m)
return mFunc, true return mFunc, true
@ -201,7 +201,10 @@ func (m methodEventSayHello) getKind() CommandOrEvent {
} }
func (m methodEventSayHello) handler(s *server, proc process, message Message, node string) ([]byte, error) { func (m methodEventSayHello) handler(s *server, proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- Received hello from %v \n", message.FromNode) //fmt.Printf("-- DEBUG 3.1: %#v, %#v, %#v\n\n", proc.subject.name(), proc.procFunc, proc.procFuncCh)
//pn := processNameGet(proc.subject.name(), processKindSubscriber)
//fmt.Printf("-- DEBUG 3.2: pn = %#v\n\n", pn)
log.Printf("<--- Received hello from %#v\n", message.FromNode)
// Since the handler is only called to handle a specific type of message we need // Since the handler is only called to handle a specific type of message we need
// to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes // to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes

View file

@ -30,7 +30,25 @@ func (s *server) subscribersStart() {
fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName) fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName)
sub := newSubject(SayHello, EventNACK, s.nodeName) sub := newSubject(SayHello, EventNACK, s.nodeName)
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
// fmt.Printf("*** %#v\n", proc) proc.procFuncCh = make(chan Message)
proc.procFunc = func() error {
sayHelloNodes := make(map[node]struct{})
for {
//fmt.Printf("-- DEBUG 4.1: procFunc %v, procFuncCh %v\n\n", proc.procFunc, proc.procFuncCh)
m := <-proc.procFuncCh
fmt.Printf("-----------DEBUG : THIS IS THE procFunc BEING CALLED !!!!! ---------\n")
sayHelloNodes[m.FromNode] = struct{}{}
// update the prometheus metrics
s.metrics.metricsCh <- metricType{
metric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "hello_nodes",
Help: "The current number of total nodes who have said hello",
}),
value: float64(len(sayHelloNodes)),
}
}
}
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
@ -40,22 +58,6 @@ func (s *server) subscribersStart() {
fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName) fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName)
sub := newSubject(ErrorLog, EventNACK, "errorCentral") sub := newSubject(ErrorLog, EventNACK, "errorCentral")
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
proc.procFunc = func() error {
sayHelloNodes := make(map[node]struct{})
for {
m := <-proc.procFuncCh
sayHelloNodes[m.FromNode] = struct{}{}
// update the prometheus metrics
s.metrics.metricsCh <- metricType{
metric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "hello_nodes",
Help: "The current number of total nodes who have said hello",
}),
value: float64(len(sayHelloNodes)),
}
}
}
go proc.spawnWorker(s) go proc.spawnWorker(s)
} }
} }