From abf8c0d3c774d4bdb3090543f464b13b7f46e74c Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 4 Mar 2021 16:27:55 +0100 Subject: [PATCH] added procFunc concept, fixed race and timer issues --- etc/config.toml | 2 +- example/toShip1ManyMessages.json | 100 +++++++++++++++++++++++++++++++ example/toShip2ManyMessages.json | 100 +++++++++++++++++++++++++++++++ process.go | 66 +++++++++++++------- publisher.go | 8 +-- ringbuffer.go | 3 +- server.go | 9 +-- subscriber-services.go | 47 ++++++++------- subscriberMethodTypes.go | 15 ++--- subscribers.go | 27 +++++++-- 10 files changed, 304 insertions(+), 73 deletions(-) create mode 100644 example/toShip1ManyMessages.json create mode 100644 example/toShip2ManyMessages.json diff --git a/etc/config.toml b/etc/config.toml index f518b15..5b2765f 100644 --- a/etc/config.toml +++ b/etc/config.toml @@ -4,7 +4,7 @@ ConfigFolder = "./etc" DefaultMessageRetries = 3 DefaultMessageTimeout = 5 NodeName = "central" -ProfilingPort = "" +ProfilingPort = "6060" PromHostAndPort = ":2112" PublisherServiceSayhello = 0 SubscribersDataFolder = "./data" diff --git a/example/toShip1ManyMessages.json b/example/toShip1ManyMessages.json new file mode 100644 index 0000000..689ab39 --- /dev/null +++ b/example/toShip1ManyMessages.json @@ -0,0 +1,100 @@ +[ + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship1", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + } +] \ No newline at end of file diff --git a/example/toShip2ManyMessages.json b/example/toShip2ManyMessages.json new file mode 100644 index 0000000..f98bd11 --- /dev/null +++ b/example/toShip2ManyMessages.json @@ -0,0 +1,100 @@ +[ + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + }, + { + "toNode": "ship2", + "data": ["bash","-c","netstat -an|grep -i listen"], + "method":"CLICommand", + "timeout":3, + "retries":3 + } +] \ No newline at end of file diff --git a/process.go b/process.go index 82c8977..292f939 100644 --- a/process.go +++ b/process.go @@ -40,11 +40,21 @@ type process struct { allowedReceivers map[node]struct{} // methodsAvailable methodsAvailable MethodsAvailable + // TESTING: + // Helper or service function that can do some kind of work + // for the process. + // The idea is that this can hold for example the map of the + // the hello nodes to limit shared resources in the system as + // a whole for sharing a map from the *server level. + procFunc func() error + // The channel to send a messages to the procFunc go routine. + // This is typically used within the methodHandler. + procFuncCh chan Message } // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(processes *processes, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node) process { +func newProcess(processes *processes, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process { // create the initial configuration for a sessions communicating with 1 host process. processes.lastProcessID++ @@ -97,12 +107,24 @@ func (p process) spawnWorker(s *server) { // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. if p.processKind == processKindPublisher { - p.publishMessages(s) + go p.publishMessages(s) } // Start a subscriber worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. if p.processKind == processKindSubscriber { + // If there is a procFunc for the process, start it. + if p.procFunc != nil { + p.procFuncCh = make(chan Message) + // Start the procFunc in it's own anonymous func so we are able + // to get the return error. + go func() { + err := p.procFunc() + if err != nil { + log.Printf("error: spawnWorker: procFunc failed: %v\n", err) + } + }() + } p.subscribeMessages(s) } } @@ -208,7 +230,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // method etc. switch { case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: - log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) + // REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { // TODO: Check how errors should be handled here!!! @@ -243,11 +265,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // TESTING: Simulate that we also want to send some error that occured // to the errorCentral { - err := fmt.Errorf("error: some testing error we want to send out") + err := fmt.Errorf("error: some testing error we want to send out from %v", p.node) sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) } case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK: - log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) + // REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { // TODO: Check how errors should be handled here!!! @@ -298,24 +320,28 @@ func (p process) publishMessages(s *server) { // Increment the counter for the next message to be sent. p.messageID++ + s.processes.mu.Lock() s.processes.active[pn] = p - time.Sleep(time.Second * 1) + s.processes.mu.Unlock() + // REMOVED: sleep + //time.Sleep(time.Second * 1) // NB: simulate that we get an error, and that we can send that // out of the process and receive it in another thread. - ep := errProcess{ - infoText: "process failed", - process: p, - message: m, - errorActionCh: make(chan errorAction), - } - s.errorKernel.errorCh <- ep - - // Wait for the response action back from the error kernel, and - // decide what to do. Should we continue, quit, or .... ? - switch <-ep.errorActionCh { - case errActionContinue: - log.Printf("The errAction was continue...so we're continuing\n") - } + // REMOVED: Error simulation + // ep := errProcess{ + // infoText: "process failed", + // process: p, + // message: m, + // errorActionCh: make(chan errorAction), + // } + // s.errorKernel.errorCh <- ep + // + // // Wait for the response action back from the error kernel, and + // // decide what to do. Should we continue, quit, or .... ? + // switch <-ep.errorActionCh { + // case errActionContinue: + // log.Printf("The errAction was continue...so we're continuing\n") + // } } } diff --git a/publisher.go b/publisher.go index 030fd40..2e88a87 100644 --- a/publisher.go +++ b/publisher.go @@ -2,7 +2,6 @@ package steward import ( "log" - "time" ) // processNewMessages takes a database name and an input channel as @@ -87,11 +86,12 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode) - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindPublisher, nil) + proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(s) + proc.spawnWorker(s) - time.Sleep(time.Millisecond * 500) + // REMOVED: + //time.Sleep(time.Millisecond * 500) s.printProcessesMap() // Now when the process is spawned we jump back to the redo: label, // and send the message to that new process. diff --git a/ringbuffer.go b/ringbuffer.go index 2753ce6..d9d159b 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -383,7 +383,8 @@ func (r *ringBuffer) startPermanentStore() { log.Printf("error:failed to write entry: %v\n", err) } - time.Sleep(time.Second * 1) + // REMOVED: time + // time.Sleep(time.Second * 1) } } diff --git a/server.go b/server.go index 08495c9..8c54a1a 100644 --- a/server.go +++ b/server.go @@ -57,12 +57,6 @@ type server struct { errorKernel *errorKernel // metric exporter metrics *metrics - // subscriberServices are where we find the services and the API to - // use services needed by subscriber. - // For example, this can be a service that knows - // how to forward the data for a received message of type log to a - // central logger. - subscriberServices *subscriberServices // Is this the central error logger ? // collection of the publisher services and the types to control them publisherServices *publisherServices @@ -83,7 +77,6 @@ func NewServer(c *Configuration) (*server, error) { processes: newProcesses(), newMessagesCh: make(chan []subjectAndMessage), metrics: newMetrics(c.PromHostAndPort), - subscriberServices: newSubscriberServices(), publisherServices: newPublisherServices(c.PublisherServiceSayhello), centralErrorLogger: c.CentralErrorLogger, } @@ -134,7 +127,7 @@ func (s *server) Start() { // files. s.subscribersStart() - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 1) s.printProcessesMap() // Start the processing of new messaging from an input channel. diff --git a/subscriber-services.go b/subscriber-services.go index 863130f..85cf2a2 100644 --- a/subscriber-services.go +++ b/subscriber-services.go @@ -1,25 +1,26 @@ package steward -// subscriberServices will hold all the helper services needed for -// the different subcribers. Example of a help service can be a log -// subscriber needs a way to write logs locally or send them to some -// other central logging system. -type subscriberServices struct { - // sayHelloNodes are the register where the register where nodes - // who have sent an sayHello are stored. Since the sayHello - // subscriber is a handler that will be just be called when a - // hello message is received we need to store the metrics somewhere - // else, that is why we store it here....at least for now. - sayHelloNodes map[node]struct{} -} - -//newSubscriberServices will prepare and return a *subscriberServices -func newSubscriberServices() *subscriberServices { - s := subscriberServices{ - sayHelloNodes: make(map[node]struct{}), - } - - return &s -} - -// --- +// // subscriberServices will hold all the helper services needed for +// // the different subcribers. Example of a help service can be a log +// // subscriber needs a way to write logs locally or send them to some +// // other central logging system. +// type subscriberServices struct { +// // sayHelloNodes are the register where the register where nodes +// // who have sent an sayHello are stored. Since the sayHello +// // subscriber is a handler that will be just be called when a +// // hello message is received we need to store the metrics somewhere +// // else, that is why we store it here....at least for now. +// sayHelloNodes map[node]struct{} +// } +// +// //newSubscriberServices will prepare and return a *subscriberServices +// func newSubscriberServices() *subscriberServices { +// s := subscriberServices{ +// sayHelloNodes: make(map[node]struct{}), +// } +// +// return &s +// } +// +// // --- +// diff --git a/subscriberMethodTypes.go b/subscriberMethodTypes.go index 067476e..a32bd85 100644 --- a/subscriberMethodTypes.go +++ b/subscriberMethodTypes.go @@ -38,8 +38,6 @@ import ( "os" "os/exec" "path/filepath" - - "github.com/prometheus/client_golang/prometheus" ) // ------------------------------------------------------------ @@ -206,16 +204,11 @@ func (m methodEventSayHello) handler(s *server, proc process, message Message, n log.Printf("<--- Received hello from %v \n", message.FromNode) // Since the handler is only called to handle a specific type of message we need // to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes - s.subscriberServices.sayHelloNodes[message.FromNode] = struct{}{} - // update the prometheus metrics - s.metrics.metricsCh <- metricType{ - metric: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "hello_nodes", - Help: "The current number of total nodes who have said hello", - }), - value: float64(len(s.subscriberServices.sayHelloNodes)), - } + // send the message to the procFuncCh which is running alongside the process + // and can hold registries and handle special things for an individual process. + proc.procFuncCh <- message + outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return outMsg, nil } diff --git a/subscribers.go b/subscribers.go index 3caa425..784e145 100644 --- a/subscribers.go +++ b/subscribers.go @@ -2,6 +2,8 @@ package steward import ( "fmt" + + "github.com/prometheus/client_golang/prometheus" ) func (s *server) subscribersStart() { @@ -9,7 +11,7 @@ func (s *server) subscribersStart() { { fmt.Printf("Starting CLICommand subscriber: %#v\n", s.nodeName) sub := newSubject(CLICommand, CommandACK, s.nodeName) - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"}) + proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"}, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(s) } @@ -18,7 +20,7 @@ func (s *server) subscribersStart() { { fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName) sub := newSubject(TextLogging, EventACK, s.nodeName) - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}) + proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(s) } @@ -27,7 +29,7 @@ func (s *server) subscribersStart() { { fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName) sub := newSubject(SayHello, EventNACK, s.nodeName) - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}) + proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) // fmt.Printf("*** %#v\n", proc) go proc.spawnWorker(s) } @@ -37,8 +39,23 @@ func (s *server) subscribersStart() { { fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName) sub := newSubject(ErrorLog, EventNACK, "errorCentral") - proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}) - // fmt.Printf("*** %#v\n", proc) + proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + proc.procFunc = func() error { + sayHelloNodes := make(map[node]struct{}) + for { + m := <-proc.procFuncCh + sayHelloNodes[m.FromNode] = struct{}{} + + // update the prometheus metrics + s.metrics.metricsCh <- metricType{ + metric: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "hello_nodes", + Help: "The current number of total nodes who have said hello", + }), + value: float64(len(sayHelloNodes)), + } + } + } go proc.spawnWorker(s) } }