diff --git a/README.md b/README.md index 5ab39e1..e28b2a5 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,8 @@ and for a shell command of type command to a host named "ship2" ## TODO +- FIX so it can handle multiple slices of input for inmsg.txt + - Add config file options to use when starting up the program for options. - Rename CLICommand to cli diff --git a/example/toShip1TwoMessages.json b/example/toShip1TwoMessages.json new file mode 100644 index 0000000..b260195 --- /dev/null +++ b/example/toShip1TwoMessages.json @@ -0,0 +1,18 @@ +[ + { + + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":5, + "retries":10 + }, + { + + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":5, + "retries":10 + } +] \ No newline at end of file diff --git a/incommmingBuffer.db b/incommmingBuffer.db index af5436d..f68d5ec 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/process.go b/process.go index 292f939..6d95969 100644 --- a/process.go +++ b/process.go @@ -115,7 +115,7 @@ func (p process) spawnWorker(s *server) { if p.processKind == processKindSubscriber { // If there is a procFunc for the process, start it. if p.procFunc != nil { - p.procFuncCh = make(chan Message) + // REMOVED: p.procFuncCh = make(chan Message) // Start the procFunc in it's own anonymous func so we are able // to get the return error. go func() { @@ -125,6 +125,8 @@ func (p process) spawnWorker(s *server) { } }() } + + //fmt.Printf("-- DEBUG 1.1: %#v, %#v, %#v\n\n", p.subject.name(), p.procFunc, p.procFuncCh) p.subscribeMessages(s) } } @@ -283,6 +285,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // // since we don't send a reply for a NACK message, we don't care about the // out return when calling mf.handler + //fmt.Printf("-- DEBUG 2.2.1: %#v\n\n", p.subject) _, err := mf.handler(s, p, message, thisNode) if err != nil { @@ -297,11 +300,13 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. func (p process) subscribeMessages(s *server) { + //fmt.Printf("-- DEBUG 2.1: %#v, %#v, %#v\n\n", p.subject.name(), p.procFunc, p.procFuncCh) subject := string(p.subject.name()) _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { // We start one handler per message received by using go routines here. // This is for being able to reply back the current publisher who sent // the message. + //fmt.Printf("-- DEBUG 2.2: %#v, %#v, %#v\n\n", p.subject.name(), p.procFunc, p.procFuncCh) go p.subscriberHandler(s.natsConn, s.nodeName, msg, s) }) if err != nil { diff --git a/publisher-services.go b/publisher-services.go index 06c46cf..e1388f0 100644 --- a/publisher-services.go +++ b/publisher-services.go @@ -42,12 +42,12 @@ func (s *sayHelloPublisher) createMsg(FromNode node) subjectAndMessage { sam := subjectAndMessage{ Subject: Subject{ - ToNode: "errorCentral", + ToNode: "central", CommandOrEvent: EventNACK, - Method: ErrorLog, + Method: SayHello, }, Message: Message{ - ToNode: "errorCentral", + ToNode: "central", FromNode: FromNode, Data: []string{m}, Method: SayHello, diff --git a/ringbuffer.go b/ringbuffer.go index d9d159b..5ba87b2 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -263,7 +263,7 @@ func (r *ringBuffer) printBucketContent(bucket string) error { err := r.db.View(func(tx *bolt.Tx) error { bu := tx.Bucket([]byte(bucket)) - fmt.Println("--------------------------K/V STORE DUMP---------------------------") + fmt.Println("-- K/V STORE DUMP--") bu.ForEach(func(k, v []byte) error { var vv samDBValue err := json.Unmarshal(v, &vv) @@ -273,7 +273,7 @@ func (r *ringBuffer) printBucketContent(bucket string) error { fmt.Printf("k: %s, v: %v\n", k, vv) return nil }) - fmt.Println("-------------------------------------------------------------------") + fmt.Println("--") return nil }) diff --git a/subscriberMethodTypes.go b/subscriberMethodTypes.go index a32bd85..4080ba9 100644 --- a/subscriberMethodTypes.go +++ b/subscriberMethodTypes.go @@ -68,7 +68,7 @@ type Method string // - EventNack func (m Method) GetMethodsAvailable() MethodsAvailable { ma := MethodsAvailable{ - topics: map[Method]methodHandler{ + methodhandlers: map[Method]methodHandler{ CLICommand: methodCommandCLICommand{ commandOrEvent: CommandACK, }, @@ -92,20 +92,20 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { // as input argument. func (m Method) getHandler(method Method) methodHandler { ma := m.GetMethodsAvailable() - mh := ma.topics[method] + mh := ma.methodhandlers[method] return mh } type MethodsAvailable struct { - topics map[Method]methodHandler + methodhandlers map[Method]methodHandler } // Check if exists will check if the Method is defined. If true the bool // value will be set to true, and the methodHandler function for that type // will be returned. func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) { - mFunc, ok := ma.topics[m] + mFunc, ok := ma.methodhandlers[m] if ok { // fmt.Printf("******THE TOPIC EXISTS: %v******\n", m) return mFunc, true @@ -201,7 +201,10 @@ func (m methodEventSayHello) getKind() CommandOrEvent { } func (m methodEventSayHello) handler(s *server, proc process, message Message, node string) ([]byte, error) { - log.Printf("<--- Received hello from %v \n", message.FromNode) + //fmt.Printf("-- DEBUG 3.1: %#v, %#v, %#v\n\n", proc.subject.name(), proc.procFunc, proc.procFuncCh) + //pn := processNameGet(proc.subject.name(), processKindSubscriber) + //fmt.Printf("-- DEBUG 3.2: pn = %#v\n\n", pn) + log.Printf("<--- Received hello from %#v\n", message.FromNode) // Since the handler is only called to handle a specific type of message we need // to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes diff --git a/subscribers.go b/subscribers.go index 784e145..52df85e 100644 --- a/subscribers.go +++ b/subscribers.go @@ -30,7 +30,25 @@ func (s *server) subscribersStart() { fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName) sub := newSubject(SayHello, EventNACK, s.nodeName) proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) - // fmt.Printf("*** %#v\n", proc) + proc.procFuncCh = make(chan Message) + proc.procFunc = func() error { + sayHelloNodes := make(map[node]struct{}) + for { + //fmt.Printf("-- DEBUG 4.1: procFunc %v, procFuncCh %v\n\n", proc.procFunc, proc.procFuncCh) + m := <-proc.procFuncCh + fmt.Printf("-----------DEBUG : THIS IS THE procFunc BEING CALLED !!!!! ---------\n") + sayHelloNodes[m.FromNode] = struct{}{} + + // update the prometheus metrics + s.metrics.metricsCh <- metricType{ + metric: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "hello_nodes", + Help: "The current number of total nodes who have said hello", + }), + value: float64(len(sayHelloNodes)), + } + } + } go proc.spawnWorker(s) } @@ -40,22 +58,6 @@ func (s *server) subscribersStart() { fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName) sub := newSubject(ErrorLog, EventNACK, "errorCentral") proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) - proc.procFunc = func() error { - sayHelloNodes := make(map[node]struct{}) - for { - m := <-proc.procFuncCh - sayHelloNodes[m.FromNode] = struct{}{} - - // update the prometheus metrics - s.metrics.metricsCh <- metricType{ - metric: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "hello_nodes", - Help: "The current number of total nodes who have said hello", - }), - value: float64(len(sayHelloNodes)), - } - } - } go proc.spawnWorker(s) } }