1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-15 17:51:15 +00:00

added say hello handler, prometheus metrics, and correct handling of NACK messages

This commit is contained in:
postmannen 2021-02-18 14:27:53 +01:00
parent 6b59608c82
commit 0fabb73cd9
8 changed files with 80 additions and 17 deletions

View file

@ -0,0 +1,8 @@
[
{
"toNode": "central",
"data": [""],
"commandOrEvent":"eventNACK",
"method":"sayHello"
}
]

Binary file not shown.

View file

@ -10,15 +10,22 @@ import (
) )
type metrics struct { type metrics struct {
totalRunningProcesses prometheus.Gauge helloNodes map[node]struct{}
HelloNodes prometheus.Gauge
TotalRunningProcesses prometheus.Gauge
} }
func newMetrics() *metrics { func newMetrics() *metrics {
m := metrics{ m := metrics{
totalRunningProcesses: promauto.NewGauge(prometheus.GaugeOpts{ helloNodes: make(map[node]struct{}),
TotalRunningProcesses: promauto.NewGauge(prometheus.GaugeOpts{
Name: "total_running_processes", Name: "total_running_processes",
Help: "The current number of total running processes", Help: "The current number of total running processes",
}), }),
HelloNodes: promauto.NewGauge(prometheus.GaugeOpts{
Name: "hello_nodes",
Help: "The current number of total nodes who have said hello",
}),
} }
return &m return &m
@ -28,7 +35,8 @@ func (s *server) startMetrics() {
go func() { go func() {
for { for {
s.metrics.totalRunningProcesses.Set(float64(len(s.processes))) s.metrics.TotalRunningProcesses.Set(float64(len(s.processes)))
s.metrics.HelloNodes.Set(float64(len(s.metrics.helloNodes)))
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
} }
}() }()

View file

@ -110,7 +110,7 @@ func (s *server) Start() {
// Start a subscriber for shellCommand messages // Start a subscriber for shellCommand messages
{ {
fmt.Printf("nodeName: %#v\n", s.nodeName) fmt.Printf("Starting shellCommand subscriber: %#v\n", s.nodeName)
sub := newSubject(s.nodeName, CommandACK, "shellCommand") sub := newSubject(s.nodeName, CommandACK, "shellCommand")
proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber) proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
@ -119,13 +119,22 @@ func (s *server) Start() {
// Start a subscriber for textLogging messages // Start a subscriber for textLogging messages
{ {
fmt.Printf("nodeName: %#v\n", s.nodeName) fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName)
sub := newSubject(s.nodeName, EventACK, "textLogging") sub := newSubject(s.nodeName, EventACK, "textLogging")
proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber) proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc) go s.processSpawnWorker(proc)
} }
// Start a subscriber for sayHello messages
{
fmt.Printf("Starting sayHello subscriber: %#v\n", s.nodeName)
sub := newSubject(s.nodeName, EventNACK, "sayHello")
proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawnWorker(proc)
}
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
s.printProcessesMap() s.printProcessesMap()
@ -407,15 +416,19 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
continue continue
} }
// If the message is an ACK type of message we must check that a
// reply, and if it is not we don't wait here at all.
if message.CommandOrEvent == CommandACK || message.CommandOrEvent == EventACK {
// Wait up until 10 seconds for a reply, // Wait up until 10 seconds for a reply,
// continue and resend if to reply received. // continue and resend if to reply received.
msgReply, err := subReply.NextMsg(time.Second * 10) msgReply, err := subReply.NextMsg(time.Second * 10)
if err != nil { if err != nil {
log.Printf("error: subRepl.NextMsg failed for node=%v, subject=%v: %v\n", proc.node, proc.subject.name(), err) log.Printf("error: subReply.NextMsg failed for node=%v, subject=%v: %v\n", proc.node, proc.subject.name(), err)
// did not receive a reply, continuing from top again // did not receive a reply, continuing from top again
continue continue
} }
log.Printf("publisher: received ACK: %s\n", msgReply.Data) log.Printf("publisher: received ACK: %s\n", msgReply.Data)
}
return return
} }
} }
@ -463,23 +476,38 @@ func (s *server) subscriberHandler(natsConn *nats.Conn, node string, msg *nats.M
// method etc. // method etc.
switch { switch {
case message.CommandOrEvent == CommandACK || message.CommandOrEvent == EventACK: case message.CommandOrEvent == CommandACK || message.CommandOrEvent == EventACK:
fmt.Printf("* message.CommandOrEvent received was = %v\n", message.CommandOrEvent) log.Printf("info: subscriberHandler: message.CommandOrEvent received was = %v, preparing to call handler\n", message.CommandOrEvent)
mf, ok := s.methodsAvailable.CheckIfExists(message.Method) mf, ok := s.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
// TODO: Check how errors should be handled here!!! // TODO: Check how errors should be handled here!!!
log.Printf("*****METHOD MISSING \n") log.Printf("error: subscriberHandler: method type not available: %v\n", message.CommandOrEvent)
} }
fmt.Printf("*** DEBUG: BEFORE CALLING HANDLER: ACK\n")
out, err := mf.handler(s, message, node) out, err := mf.handler(s, message, node)
if err != nil { if err != nil {
// TODO: Send to error kernel ? // TODO: Send to error kernel ?
log.Printf("error: failed to execute event: %v\n", err) log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
} }
// Send a confirmation message back to the publisher // Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, out) natsConn.Publish(msg.Reply, out)
case message.CommandOrEvent == CommandNACK || message.CommandOrEvent == EventNACK: case message.CommandOrEvent == CommandNACK || message.CommandOrEvent == EventNACK:
fmt.Printf("* message.CommandOrEvent received was = %v\n", message.CommandOrEvent) log.Printf("info: subscriberHandler: message.CommandOrEvent received was = %v, preparing to call handler\n", message.CommandOrEvent)
mf, ok := s.methodsAvailable.CheckIfExists(message.Method)
if !ok {
// TODO: Check how errors should be handled here!!!
log.Printf("error: subscriberHandler: method type not available: %v\n", message.CommandOrEvent)
}
// since we don't send a reply for a NACK message, we don't care about the
// out return when calling mf.handler
fmt.Printf("*** DEBUG: BEFORE CALLING HANDLER: NACK\n")
_, err := mf.handler(s, message, node)
if err != nil {
// TODO: Send to error kernel ?
log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
}
default: default:
log.Printf("info: did not find that specific type of command: %#v\n", message.CommandOrEvent) log.Printf("info: did not find that specific type of command: %#v\n", message.CommandOrEvent)
} }

View file

@ -23,6 +23,7 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
topics: map[Method]methodHandler{ topics: map[Method]methodHandler{
ShellCommand: methodCommandShellCommand{}, ShellCommand: methodCommandShellCommand{},
TextLogging: methodEventTextLogging{}, TextLogging: methodEventTextLogging{},
SayHello: methodEventSayHello{},
}, },
} }
@ -34,6 +35,8 @@ const (
ShellCommand Method = "shellCommand" ShellCommand Method = "shellCommand"
// Send text logging to some host // Send text logging to some host
TextLogging Method = "textLogging" TextLogging Method = "textLogging"
// Send Hello I'm here message
SayHello Method = "sayHello"
) )
type MethodsAvailable struct { type MethodsAvailable struct {
@ -54,6 +57,8 @@ func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
} }
} }
// ------------------------------------------------------------
// Subscriber method handlers
// ------------------------------------------------------------ // ------------------------------------------------------------
type methodHandler interface { type methodHandler interface {
@ -79,6 +84,8 @@ func (m methodCommandShellCommand) handler(s *server, message Message, node stri
return outMsg, nil return outMsg, nil
} }
// -----
type methodEventTextLogging struct{} type methodEventTextLogging struct{}
func (m methodEventTextLogging) handler(s *server, message Message, node string) ([]byte, error) { func (m methodEventTextLogging) handler(s *server, message Message, node string) ([]byte, error) {
@ -89,3 +96,15 @@ func (m methodEventTextLogging) handler(s *server, message Message, node string)
outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return outMsg, nil return outMsg, nil
} }
// -----
type methodEventSayHello struct{}
func (m methodEventSayHello) handler(s *server, message Message, node string) ([]byte, error) {
log.Printf("################## Received hello from %v ##################\n", message.FromNode)
s.metrics.helloNodes[message.FromNode] = struct{}{}
outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return outMsg, nil
}