mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
added procFunc concept, fixed race and timer issues
This commit is contained in:
parent
9c9498341c
commit
abf8c0d3c7
10 changed files with 304 additions and 73 deletions
|
@ -4,7 +4,7 @@ ConfigFolder = "./etc"
|
|||
DefaultMessageRetries = 3
|
||||
DefaultMessageTimeout = 5
|
||||
NodeName = "central"
|
||||
ProfilingPort = ""
|
||||
ProfilingPort = "6060"
|
||||
PromHostAndPort = ":2112"
|
||||
PublisherServiceSayhello = 0
|
||||
SubscribersDataFolder = "./data"
|
||||
|
|
100
example/toShip1ManyMessages.json
Normal file
100
example/toShip1ManyMessages.json
Normal file
|
@ -0,0 +1,100 @@
|
|||
[
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship1",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
}
|
||||
]
|
100
example/toShip2ManyMessages.json
Normal file
100
example/toShip2ManyMessages.json
Normal file
|
@ -0,0 +1,100 @@
|
|||
[
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
},
|
||||
{
|
||||
"toNode": "ship2",
|
||||
"data": ["bash","-c","netstat -an|grep -i listen"],
|
||||
"method":"CLICommand",
|
||||
"timeout":3,
|
||||
"retries":3
|
||||
}
|
||||
]
|
66
process.go
66
process.go
|
@ -40,11 +40,21 @@ type process struct {
|
|||
allowedReceivers map[node]struct{}
|
||||
// methodsAvailable
|
||||
methodsAvailable MethodsAvailable
|
||||
// TESTING:
|
||||
// Helper or service function that can do some kind of work
|
||||
// for the process.
|
||||
// The idea is that this can hold for example the map of the
|
||||
// the hello nodes to limit shared resources in the system as
|
||||
// a whole for sharing a map from the *server level.
|
||||
procFunc func() error
|
||||
// The channel to send a messages to the procFunc go routine.
|
||||
// This is typically used within the methodHandler.
|
||||
procFuncCh chan Message
|
||||
}
|
||||
|
||||
// prepareNewProcess will set the the provided values and the default
|
||||
// values for a process.
|
||||
func newProcess(processes *processes, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node) process {
|
||||
func newProcess(processes *processes, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process {
|
||||
// create the initial configuration for a sessions communicating with 1 host process.
|
||||
processes.lastProcessID++
|
||||
|
||||
|
@ -97,12 +107,24 @@ func (p process) spawnWorker(s *server) {
|
|||
// Start a publisher worker, which will start a go routine (process)
|
||||
// That will take care of all the messages for the subject it owns.
|
||||
if p.processKind == processKindPublisher {
|
||||
p.publishMessages(s)
|
||||
go p.publishMessages(s)
|
||||
}
|
||||
|
||||
// Start a subscriber worker, which will start a go routine (process)
|
||||
// That will take care of all the messages for the subject it owns.
|
||||
if p.processKind == processKindSubscriber {
|
||||
// If there is a procFunc for the process, start it.
|
||||
if p.procFunc != nil {
|
||||
p.procFuncCh = make(chan Message)
|
||||
// Start the procFunc in it's own anonymous func so we are able
|
||||
// to get the return error.
|
||||
go func() {
|
||||
err := p.procFunc()
|
||||
if err != nil {
|
||||
log.Printf("error: spawnWorker: procFunc failed: %v\n", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
p.subscribeMessages(s)
|
||||
}
|
||||
}
|
||||
|
@ -208,7 +230,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
// method etc.
|
||||
switch {
|
||||
case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK:
|
||||
log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
|
||||
// REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
|
||||
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||
if !ok {
|
||||
// TODO: Check how errors should be handled here!!!
|
||||
|
@ -243,11 +265,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
// TESTING: Simulate that we also want to send some error that occured
|
||||
// to the errorCentral
|
||||
{
|
||||
err := fmt.Errorf("error: some testing error we want to send out")
|
||||
err := fmt.Errorf("error: some testing error we want to send out from %v", p.node)
|
||||
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
|
||||
}
|
||||
case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK:
|
||||
log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
|
||||
// REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
|
||||
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||
if !ok {
|
||||
// TODO: Check how errors should be handled here!!!
|
||||
|
@ -298,24 +320,28 @@ func (p process) publishMessages(s *server) {
|
|||
|
||||
// Increment the counter for the next message to be sent.
|
||||
p.messageID++
|
||||
s.processes.mu.Lock()
|
||||
s.processes.active[pn] = p
|
||||
time.Sleep(time.Second * 1)
|
||||
s.processes.mu.Unlock()
|
||||
// REMOVED: sleep
|
||||
//time.Sleep(time.Second * 1)
|
||||
|
||||
// NB: simulate that we get an error, and that we can send that
|
||||
// out of the process and receive it in another thread.
|
||||
ep := errProcess{
|
||||
infoText: "process failed",
|
||||
process: p,
|
||||
message: m,
|
||||
errorActionCh: make(chan errorAction),
|
||||
}
|
||||
s.errorKernel.errorCh <- ep
|
||||
|
||||
// Wait for the response action back from the error kernel, and
|
||||
// decide what to do. Should we continue, quit, or .... ?
|
||||
switch <-ep.errorActionCh {
|
||||
case errActionContinue:
|
||||
log.Printf("The errAction was continue...so we're continuing\n")
|
||||
}
|
||||
// REMOVED: Error simulation
|
||||
// ep := errProcess{
|
||||
// infoText: "process failed",
|
||||
// process: p,
|
||||
// message: m,
|
||||
// errorActionCh: make(chan errorAction),
|
||||
// }
|
||||
// s.errorKernel.errorCh <- ep
|
||||
//
|
||||
// // Wait for the response action back from the error kernel, and
|
||||
// // decide what to do. Should we continue, quit, or .... ?
|
||||
// switch <-ep.errorActionCh {
|
||||
// case errActionContinue:
|
||||
// log.Printf("The errAction was continue...so we're continuing\n")
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package steward
|
|||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
// processNewMessages takes a database name and an input channel as
|
||||
|
@ -87,11 +86,12 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
|
|||
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
||||
|
||||
sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode)
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindPublisher, nil)
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go proc.spawnWorker(s)
|
||||
proc.spawnWorker(s)
|
||||
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
// REMOVED:
|
||||
//time.Sleep(time.Millisecond * 500)
|
||||
s.printProcessesMap()
|
||||
// Now when the process is spawned we jump back to the redo: label,
|
||||
// and send the message to that new process.
|
||||
|
|
|
@ -383,7 +383,8 @@ func (r *ringBuffer) startPermanentStore() {
|
|||
log.Printf("error:failed to write entry: %v\n", err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
// REMOVED: time
|
||||
// time.Sleep(time.Second * 1)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -57,12 +57,6 @@ type server struct {
|
|||
errorKernel *errorKernel
|
||||
// metric exporter
|
||||
metrics *metrics
|
||||
// subscriberServices are where we find the services and the API to
|
||||
// use services needed by subscriber.
|
||||
// For example, this can be a service that knows
|
||||
// how to forward the data for a received message of type log to a
|
||||
// central logger.
|
||||
subscriberServices *subscriberServices
|
||||
// Is this the central error logger ?
|
||||
// collection of the publisher services and the types to control them
|
||||
publisherServices *publisherServices
|
||||
|
@ -83,7 +77,6 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
processes: newProcesses(),
|
||||
newMessagesCh: make(chan []subjectAndMessage),
|
||||
metrics: newMetrics(c.PromHostAndPort),
|
||||
subscriberServices: newSubscriberServices(),
|
||||
publisherServices: newPublisherServices(c.PublisherServiceSayhello),
|
||||
centralErrorLogger: c.CentralErrorLogger,
|
||||
}
|
||||
|
@ -134,7 +127,7 @@ func (s *server) Start() {
|
|||
// files.
|
||||
s.subscribersStart()
|
||||
|
||||
time.Sleep(time.Second * 2)
|
||||
time.Sleep(time.Second * 1)
|
||||
s.printProcessesMap()
|
||||
|
||||
// Start the processing of new messaging from an input channel.
|
||||
|
|
|
@ -1,25 +1,26 @@
|
|||
package steward
|
||||
|
||||
// subscriberServices will hold all the helper services needed for
|
||||
// the different subcribers. Example of a help service can be a log
|
||||
// subscriber needs a way to write logs locally or send them to some
|
||||
// other central logging system.
|
||||
type subscriberServices struct {
|
||||
// sayHelloNodes are the register where the register where nodes
|
||||
// who have sent an sayHello are stored. Since the sayHello
|
||||
// subscriber is a handler that will be just be called when a
|
||||
// hello message is received we need to store the metrics somewhere
|
||||
// else, that is why we store it here....at least for now.
|
||||
sayHelloNodes map[node]struct{}
|
||||
}
|
||||
|
||||
//newSubscriberServices will prepare and return a *subscriberServices
|
||||
func newSubscriberServices() *subscriberServices {
|
||||
s := subscriberServices{
|
||||
sayHelloNodes: make(map[node]struct{}),
|
||||
}
|
||||
|
||||
return &s
|
||||
}
|
||||
|
||||
// ---
|
||||
// // subscriberServices will hold all the helper services needed for
|
||||
// // the different subcribers. Example of a help service can be a log
|
||||
// // subscriber needs a way to write logs locally or send them to some
|
||||
// // other central logging system.
|
||||
// type subscriberServices struct {
|
||||
// // sayHelloNodes are the register where the register where nodes
|
||||
// // who have sent an sayHello are stored. Since the sayHello
|
||||
// // subscriber is a handler that will be just be called when a
|
||||
// // hello message is received we need to store the metrics somewhere
|
||||
// // else, that is why we store it here....at least for now.
|
||||
// sayHelloNodes map[node]struct{}
|
||||
// }
|
||||
//
|
||||
// //newSubscriberServices will prepare and return a *subscriberServices
|
||||
// func newSubscriberServices() *subscriberServices {
|
||||
// s := subscriberServices{
|
||||
// sayHelloNodes: make(map[node]struct{}),
|
||||
// }
|
||||
//
|
||||
// return &s
|
||||
// }
|
||||
//
|
||||
// // ---
|
||||
//
|
||||
|
|
|
@ -38,8 +38,6 @@ import (
|
|||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
@ -206,16 +204,11 @@ func (m methodEventSayHello) handler(s *server, proc process, message Message, n
|
|||
log.Printf("<--- Received hello from %v \n", message.FromNode)
|
||||
// Since the handler is only called to handle a specific type of message we need
|
||||
// to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes
|
||||
s.subscriberServices.sayHelloNodes[message.FromNode] = struct{}{}
|
||||
|
||||
// update the prometheus metrics
|
||||
s.metrics.metricsCh <- metricType{
|
||||
metric: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "hello_nodes",
|
||||
Help: "The current number of total nodes who have said hello",
|
||||
}),
|
||||
value: float64(len(s.subscriberServices.sayHelloNodes)),
|
||||
}
|
||||
// send the message to the procFuncCh which is running alongside the process
|
||||
// and can hold registries and handle special things for an individual process.
|
||||
proc.procFuncCh <- message
|
||||
|
||||
outMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return outMsg, nil
|
||||
}
|
||||
|
|
|
@ -2,6 +2,8 @@ package steward
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func (s *server) subscribersStart() {
|
||||
|
@ -9,7 +11,7 @@ func (s *server) subscribersStart() {
|
|||
{
|
||||
fmt.Printf("Starting CLICommand subscriber: %#v\n", s.nodeName)
|
||||
sub := newSubject(CLICommand, CommandACK, s.nodeName)
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"})
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"}, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go proc.spawnWorker(s)
|
||||
}
|
||||
|
@ -18,7 +20,7 @@ func (s *server) subscribersStart() {
|
|||
{
|
||||
fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName)
|
||||
sub := newSubject(TextLogging, EventACK, s.nodeName)
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"})
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go proc.spawnWorker(s)
|
||||
}
|
||||
|
@ -27,7 +29,7 @@ func (s *server) subscribersStart() {
|
|||
{
|
||||
fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName)
|
||||
sub := newSubject(SayHello, EventNACK, s.nodeName)
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"})
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
go proc.spawnWorker(s)
|
||||
}
|
||||
|
@ -37,8 +39,23 @@ func (s *server) subscribersStart() {
|
|||
{
|
||||
fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName)
|
||||
sub := newSubject(ErrorLog, EventNACK, "errorCentral")
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"})
|
||||
// fmt.Printf("*** %#v\n", proc)
|
||||
proc := newProcess(s.processes, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
|
||||
proc.procFunc = func() error {
|
||||
sayHelloNodes := make(map[node]struct{})
|
||||
for {
|
||||
m := <-proc.procFuncCh
|
||||
sayHelloNodes[m.FromNode] = struct{}{}
|
||||
|
||||
// update the prometheus metrics
|
||||
s.metrics.metricsCh <- metricType{
|
||||
metric: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "hello_nodes",
|
||||
Help: "The current number of total nodes who have said hello",
|
||||
}),
|
||||
value: float64(len(sayHelloNodes)),
|
||||
}
|
||||
}
|
||||
}
|
||||
go proc.spawnWorker(s)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue