1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

Start proc via message, moved nats.conn into process

This commit is contained in:
postmannen 2021-04-07 16:45:51 +02:00
parent 92addf50bc
commit d909784094
9 changed files with 91 additions and 54 deletions

View file

@ -2,7 +2,7 @@
{
"directory": "metrics/network/sniffer",
"fileExtension": ".html",
"toNode": "ship1",
"toNode": "ship2",
"data": ["http://vg.no"],
"method":"REQHttpGet",
"timeout":5,

View file

@ -2,7 +2,7 @@
{
"directory":"opcommand_logs",
"fileExtension": ".log",
"toNode": "ship1",
"toNode": "ship2",
"data": ["ps"],
"method":"REQOpCommand",
"timeout":3,

View file

@ -0,0 +1,14 @@
[
{
"directory":"opcommand_logs",
"fileExtension": ".log",
"toNode": "ship2",
"data": ["startProc","REQHttpGet","central"],
"method":"REQOpCommand",
"timeout":3,
"retries":3,
"requestTimeout":3,
"requestRetries":3,
"MethodTimeout": 7
}
]

View file

@ -92,7 +92,7 @@ func newSubject(method Method, node string) Subject {
ma := method.GetMethodsAvailable()
coe, ok := ma.methodhandlers[method]
if !ok {
log.Printf("error: no CommandOrEvent type specified for the method\n")
log.Printf("error: no CommandOrEvent type specified for the method: %v\n", method)
os.Exit(1)
}

View file

@ -58,11 +58,13 @@ type process struct {
toRingbufferCh chan<- []subjectAndMessage
// The structure who holds all processes information
processes *processes
// nats connection
natsConn *nats.Conn
}
// prepareNewProcess will set the the provided values and the default
// values for a process.
func newProcess(processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process {
func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process {
// create the initial configuration for a sessions communicating with 1 host process.
processes.lastProcessID++
@ -86,6 +88,7 @@ func newProcess(processes *processes, toRingbufferCh chan<- []subjectAndMessage,
toRingbufferCh: toRingbufferCh,
configuration: configuration,
processes: processes,
natsConn: natsConn,
}
return proc
@ -108,7 +111,7 @@ type procFunc func() error
//
// It will give the process the next available ID, and also add the
// process to the processes map in the server structure.
func (p process) spawnWorker(s *server) {
func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
// We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle
// one message queue.
@ -121,9 +124,9 @@ func (p process) spawnWorker(s *server) {
}
// Add information about the new process to the started processes map.
s.processes.mu.Lock()
s.processes.active[pn] = p
s.processes.mu.Unlock()
procs.mu.Lock()
procs.active[pn] = p
procs.mu.Unlock()
// Start a publisher worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns.
@ -143,7 +146,7 @@ func (p process) spawnWorker(s *server) {
}()
}
go p.publishMessages(s.natsConn, s.processes)
go p.publishMessages(natsConn, procs)
}
// Start a subscriber worker, which will start a go routine (process)
@ -163,7 +166,7 @@ func (p process) spawnWorker(s *server) {
}()
}
p.subscribeMessages(s)
p.subscribeMessages()
}
}
@ -256,7 +259,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
// the state of the message being processed, and then reply back to the
// correct sending process's reply, meaning so we ACK back to the correct
// publisher.
func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg, s *server) {
func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg) {
message := Message{}
@ -267,7 +270,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
err := gobDec.Decode(&message)
if err != nil {
er := fmt.Errorf("error: gob decoding failed: %v", err)
sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
}
switch {
@ -275,7 +278,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
}
out := []byte("not allowed from " + message.FromNode)
@ -294,11 +297,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if err != nil {
er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err)
sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
}
} else {
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
}
// Send a confirmation message back to the publisher
@ -308,7 +311,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
}
// Check if we are allowed to receive from that host
@ -328,30 +331,30 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
if err != nil {
er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err)
sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
}
} else {
er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject)
sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
}
// ---
default:
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er)
sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er)
}
}
// Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received.
func (p process) subscribeMessages(s *server) {
func (p process) subscribeMessages() {
subject := string(p.subject.name())
_, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) {
_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// We start one handler per message received by using go routines here.
// This is for being able to reply back the current publisher who sent
// the message.
go p.subscriberHandler(s.natsConn, s.nodeName, msg, s)
go p.subscriberHandler(p.natsConn, p.configuration.NodeName, msg)
})
if err != nil {
log.Printf("error: Subscribe failed: %v\n", err)

View file

@ -277,9 +277,9 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil)
// fmt.Printf("*** %#v\n", proc)
proc.spawnWorker(s)
proc.spawnWorker(s.processes, s.natsConn)
// REMOVED:
//time.Sleep(time.Millisecond * 500)

View file

@ -15,8 +15,8 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting REQOpCommand subscriber: %#v\n", s.nodeName)
sub := newSubject(REQOpCommand, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
go proc.spawnWorker(s)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
go proc.spawnWorker(s.processes, s.natsConn)
}
// Start a subscriber for textLogging messages
@ -24,9 +24,9 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting text logging subscriber: %#v\n", s.nodeName)
sub := newSubject(REQTextToLogFile, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToLogFile.Values, nil)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToLogFile.Values, nil)
// fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(s)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
@ -35,9 +35,9 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting text to file subscriber: %#v\n", s.nodeName)
sub := newSubject(REQTextToFile, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToFile.Values, nil)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToFile.Values, nil)
// fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(s)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
@ -46,7 +46,7 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting Hello subscriber: %#v\n", s.nodeName)
sub := newSubject(REQHello, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHello.Values, nil)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHello.Values, nil)
proc.procFuncCh = make(chan Message)
// The reason for running the say hello subscriber as a procFunc is that
@ -72,7 +72,7 @@ func (s *server) ProcessesStart() {
}
}
}
go proc.spawnWorker(s)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
@ -81,8 +81,8 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting REQErrorLog subscriber: %#v\n", s.nodeName)
sub := newSubject(REQErrorLog, "errorCentral")
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQErrorLog.Values, nil)
go proc.spawnWorker(s)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQErrorLog.Values, nil)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
@ -91,8 +91,8 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting Ping Request subscriber: %#v\n", s.nodeName)
sub := newSubject(REQPing, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPing.Values, nil)
go proc.spawnWorker(s)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPing.Values, nil)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
@ -101,8 +101,8 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting Pong subscriber: %#v\n", s.nodeName)
sub := newSubject(REQPong, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPong.Values, nil)
go proc.spawnWorker(s)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPong.Values, nil)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
@ -111,8 +111,8 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting CLICommand Request subscriber: %#v\n", s.nodeName)
sub := newSubject(REQCliCommand, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQCliCommand.Values, nil)
go proc.spawnWorker(s)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQCliCommand.Values, nil)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
@ -121,8 +121,8 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", s.nodeName)
sub := newSubject(REQnCliCommand, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQnCliCommand.Values, nil)
go proc.spawnWorker(s)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQnCliCommand.Values, nil)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
@ -131,8 +131,8 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting Text To Console subscriber: %#v\n", s.nodeName)
sub := newSubject(REQTextToConsole, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToConsole.Values, nil)
go proc.spawnWorker(s)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToConsole.Values, nil)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
@ -146,7 +146,7 @@ func (s *server) ProcessesStart() {
fmt.Printf("Starting Hello Publisher: %#v\n", s.nodeName)
sub := newSubject(REQHello, s.configuration.CentralNodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil)
// Define the procFunc to be used for the process.
proc.procFunc = procFunc(
@ -172,7 +172,7 @@ func (s *server) ProcessesStart() {
time.Sleep(time.Second * time.Duration(s.configuration.StartPubREQHello))
}
})
go proc.spawnWorker(s)
go proc.spawnWorker(s.processes, s.natsConn)
}
// Start a subscriber for Http Get Requests
@ -180,9 +180,9 @@ func (s *server) ProcessesStart() {
{
fmt.Printf("Starting Http Get subscriber: %#v\n", s.nodeName)
sub := newSubject(REQHttpGet, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHttpGet.Values, nil)
proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHttpGet.Values, nil)
// fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(s)
go proc.spawnWorker(s.processes, s.natsConn)
}
}
}

View file

@ -215,7 +215,7 @@ func (m methodREQOpCommand) getKind() CommandOrEvent {
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// in the ack message.
func (m methodREQOpCommand) handler(proc process, message Message, node string) ([]byte, error) {
func (m methodREQOpCommand) handler(proc process, message Message, nodeName string) ([]byte, error) {
go func() {
out := []byte{}
@ -232,6 +232,26 @@ func (m methodREQOpCommand) handler(proc process, message Message, node string)
}
proc.processes.mu.Unlock()
case message.Data[0] == "startProc":
if len(message.Data) < 2 {
er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message))
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
return
}
newMethod := Method(message.Data[1])
// We need to convert the []string to []node
aps := message.Data[2:]
var allowedPublishers []node
for _, v := range aps {
allowedPublishers = append(allowedPublishers, node(v))
}
sub := newSubject(newMethod, proc.configuration.NodeName)
procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, allowedPublishers, nil)
go procNew.spawnWorker(proc.processes, proc.natsConn)
default:
er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0])
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
@ -243,7 +263,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, node string)
newReplyMessage(proc, message, REQTextToLogFile, out)
}()
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", node, message.ID))
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", proc.node, message.ID))
return ackMsg, nil
}

View file

@ -1,4 +1,4 @@
2021-04-07 06:11:54.599555 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout
2021-04-07 06:11:59.601342 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout
2021-04-07 06:12:04.603312 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout
2021-04-07 06:12:04.603423 +0000 UTC, info: max retries for message reached, breaking out: {ship2 1 [http://vg.no] REQHttpGet central 5 3 0 0 5 metrics/network/sniffer .html <nil> 0xc0006dc000}
2021-04-07 14:14:13.750212 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout
2021-04-07 14:14:18.755501 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout
2021-04-07 14:14:23.760201 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout
2021-04-07 14:14:23.760319 +0000 UTC, info: max retries for message reached, breaking out: {ship2 5 [http://vg.no] REQHttpGet central 5 3 0 0 5 metrics/network/sniffer .html <nil> 0xc00042e240}