diff --git a/example/toShip1-REQHttpGet.json b/example/toShip1-REQHttpGet.json index d919119..ec3271d 100644 --- a/example/toShip1-REQHttpGet.json +++ b/example/toShip1-REQHttpGet.json @@ -2,7 +2,7 @@ { "directory": "metrics/network/sniffer", "fileExtension": ".html", - "toNode": "ship1", + "toNode": "ship2", "data": ["http://vg.no"], "method":"REQHttpGet", "timeout":5, diff --git a/example/toShip1-REQOpCommand.json b/example/toShip1-REQOpCommand.json index 84d3f45..d3d61e3 100644 --- a/example/toShip1-REQOpCommand.json +++ b/example/toShip1-REQOpCommand.json @@ -2,7 +2,7 @@ { "directory":"opcommand_logs", "fileExtension": ".log", - "toNode": "ship1", + "toNode": "ship2", "data": ["ps"], "method":"REQOpCommand", "timeout":3, diff --git a/example/toShip1-REQOpCommandStart.json b/example/toShip1-REQOpCommandStart.json new file mode 100644 index 0000000..c303f19 --- /dev/null +++ b/example/toShip1-REQOpCommandStart.json @@ -0,0 +1,14 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship2", + "data": ["startProc","REQHttpGet","central"], + "method":"REQOpCommand", + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/message_and_subject.go b/message_and_subject.go index b95c01a..7956590 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -92,7 +92,7 @@ func newSubject(method Method, node string) Subject { ma := method.GetMethodsAvailable() coe, ok := ma.methodhandlers[method] if !ok { - log.Printf("error: no CommandOrEvent type specified for the method\n") + log.Printf("error: no CommandOrEvent type specified for the method: %v\n", method) os.Exit(1) } diff --git a/process.go b/process.go index b4e4e99..c20ab95 100644 --- a/process.go +++ b/process.go @@ -58,11 +58,13 @@ type process struct { toRingbufferCh chan<- []subjectAndMessage // The structure who holds all processes information processes *processes + // nats connection + natsConn *nats.Conn } // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process { +func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node, procFunc func() error) process { // create the initial configuration for a sessions communicating with 1 host process. processes.lastProcessID++ @@ -86,6 +88,7 @@ func newProcess(processes *processes, toRingbufferCh chan<- []subjectAndMessage, toRingbufferCh: toRingbufferCh, configuration: configuration, processes: processes, + natsConn: natsConn, } return proc @@ -108,7 +111,7 @@ type procFunc func() error // // It will give the process the next available ID, and also add the // process to the processes map in the server structure. -func (p process) spawnWorker(s *server) { +func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { // We use the full name of the subject to identify a unique // process. We can do that since a process can only handle // one message queue. @@ -121,9 +124,9 @@ func (p process) spawnWorker(s *server) { } // Add information about the new process to the started processes map. - s.processes.mu.Lock() - s.processes.active[pn] = p - s.processes.mu.Unlock() + procs.mu.Lock() + procs.active[pn] = p + procs.mu.Unlock() // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. @@ -143,7 +146,7 @@ func (p process) spawnWorker(s *server) { }() } - go p.publishMessages(s.natsConn, s.processes) + go p.publishMessages(natsConn, procs) } // Start a subscriber worker, which will start a go routine (process) @@ -163,7 +166,7 @@ func (p process) spawnWorker(s *server) { }() } - p.subscribeMessages(s) + p.subscribeMessages() } } @@ -256,7 +259,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { // the state of the message being processed, and then reply back to the // correct sending process's reply, meaning so we ACK back to the correct // publisher. -func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg, s *server) { +func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg) { message := Message{} @@ -267,7 +270,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na err := gobDec.Decode(&message) if err != nil { er := fmt.Errorf("error: gob decoding failed: %v", err) - sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) } switch { @@ -275,7 +278,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na mh, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) - sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) } out := []byte("not allowed from " + message.FromNode) @@ -294,11 +297,11 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na if err != nil { er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err) - sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) } } else { er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) - sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) } // Send a confirmation message back to the publisher @@ -308,7 +311,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) - sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) } // Check if we are allowed to receive from that host @@ -328,30 +331,30 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na if err != nil { er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err) - sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) } } else { er := fmt.Errorf("info: we don't allow receiving from: %v, %v", message.FromNode, p.subject) - sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) } // --- default: er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent) - sendErrorLogMessage(s.toRingbufferCh, node(thisNode), er) + sendErrorLogMessage(p.toRingbufferCh, node(thisNode), er) } } // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. -func (p process) subscribeMessages(s *server) { +func (p process) subscribeMessages() { subject := string(p.subject.name()) - _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { + _, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { // We start one handler per message received by using go routines here. // This is for being able to reply back the current publisher who sent // the message. - go p.subscriberHandler(s.natsConn, s.nodeName, msg, s) + go p.subscriberHandler(p.natsConn, p.configuration.NodeName, msg) }) if err != nil { log.Printf("error: Subscribe failed: %v\n", err) diff --git a/server.go b/server.go index 2b6d678..2cda531 100644 --- a/server.go +++ b/server.go @@ -277,9 +277,9 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) // fmt.Printf("*** %#v\n", proc) - proc.spawnWorker(s) + proc.spawnWorker(s.processes, s.natsConn) // REMOVED: //time.Sleep(time.Millisecond * 500) diff --git a/startup_processes.go b/startup_processes.go index 6ae1967..eb2d438 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -15,8 +15,8 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting REQOpCommand subscriber: %#v\n", s.nodeName) sub := newSubject(REQOpCommand, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) - go proc.spawnWorker(s) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil) + go proc.spawnWorker(s.processes, s.natsConn) } // Start a subscriber for textLogging messages @@ -24,9 +24,9 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting text logging subscriber: %#v\n", s.nodeName) sub := newSubject(REQTextToLogFile, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToLogFile.Values, nil) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToLogFile.Values, nil) // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(s) + go proc.spawnWorker(s.processes, s.natsConn) } } @@ -35,9 +35,9 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting text to file subscriber: %#v\n", s.nodeName) sub := newSubject(REQTextToFile, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToFile.Values, nil) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToFile.Values, nil) // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(s) + go proc.spawnWorker(s.processes, s.natsConn) } } @@ -46,7 +46,7 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting Hello subscriber: %#v\n", s.nodeName) sub := newSubject(REQHello, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHello.Values, nil) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHello.Values, nil) proc.procFuncCh = make(chan Message) // The reason for running the say hello subscriber as a procFunc is that @@ -72,7 +72,7 @@ func (s *server) ProcessesStart() { } } } - go proc.spawnWorker(s) + go proc.spawnWorker(s.processes, s.natsConn) } } @@ -81,8 +81,8 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting REQErrorLog subscriber: %#v\n", s.nodeName) sub := newSubject(REQErrorLog, "errorCentral") - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQErrorLog.Values, nil) - go proc.spawnWorker(s) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQErrorLog.Values, nil) + go proc.spawnWorker(s.processes, s.natsConn) } } @@ -91,8 +91,8 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting Ping Request subscriber: %#v\n", s.nodeName) sub := newSubject(REQPing, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPing.Values, nil) - go proc.spawnWorker(s) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPing.Values, nil) + go proc.spawnWorker(s.processes, s.natsConn) } } @@ -101,8 +101,8 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting Pong subscriber: %#v\n", s.nodeName) sub := newSubject(REQPong, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPong.Values, nil) - go proc.spawnWorker(s) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQPong.Values, nil) + go proc.spawnWorker(s.processes, s.natsConn) } } @@ -111,8 +111,8 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting CLICommand Request subscriber: %#v\n", s.nodeName) sub := newSubject(REQCliCommand, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQCliCommand.Values, nil) - go proc.spawnWorker(s) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQCliCommand.Values, nil) + go proc.spawnWorker(s.processes, s.natsConn) } } @@ -121,8 +121,8 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting CLICommand Not Sequential Request subscriber: %#v\n", s.nodeName) sub := newSubject(REQnCliCommand, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQnCliCommand.Values, nil) - go proc.spawnWorker(s) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQnCliCommand.Values, nil) + go proc.spawnWorker(s.processes, s.natsConn) } } @@ -131,8 +131,8 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting Text To Console subscriber: %#v\n", s.nodeName) sub := newSubject(REQTextToConsole, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToConsole.Values, nil) - go proc.spawnWorker(s) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToConsole.Values, nil) + go proc.spawnWorker(s.processes, s.natsConn) } } @@ -146,7 +146,7 @@ func (s *server) ProcessesStart() { fmt.Printf("Starting Hello Publisher: %#v\n", s.nodeName) sub := newSubject(REQHello, s.configuration.CentralNodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, []node{}, nil) // Define the procFunc to be used for the process. proc.procFunc = procFunc( @@ -172,7 +172,7 @@ func (s *server) ProcessesStart() { time.Sleep(time.Second * time.Duration(s.configuration.StartPubREQHello)) } }) - go proc.spawnWorker(s) + go proc.spawnWorker(s.processes, s.natsConn) } // Start a subscriber for Http Get Requests @@ -180,9 +180,9 @@ func (s *server) ProcessesStart() { { fmt.Printf("Starting Http Get subscriber: %#v\n", s.nodeName) sub := newSubject(REQHttpGet, s.nodeName) - proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHttpGet.Values, nil) + proc := newProcess(s.natsConn, s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHttpGet.Values, nil) // fmt.Printf("*** %#v\n", proc) - go proc.spawnWorker(s) + go proc.spawnWorker(s.processes, s.natsConn) } } } diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 26453a9..0629124 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -215,7 +215,7 @@ func (m methodREQOpCommand) getKind() CommandOrEvent { // handler to run a CLI command with timeout context. The handler will // return the output of the command run back to the calling publisher // in the ack message. -func (m methodREQOpCommand) handler(proc process, message Message, node string) ([]byte, error) { +func (m methodREQOpCommand) handler(proc process, message Message, nodeName string) ([]byte, error) { go func() { out := []byte{} @@ -232,6 +232,26 @@ func (m methodREQOpCommand) handler(proc process, message Message, node string) } proc.processes.mu.Unlock() + case message.Data[0] == "startProc": + if len(message.Data) < 2 { + er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message)) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return + } + + newMethod := Method(message.Data[1]) + + // We need to convert the []string to []node + aps := message.Data[2:] + var allowedPublishers []node + for _, v := range aps { + allowedPublishers = append(allowedPublishers, node(v)) + } + + sub := newSubject(newMethod, proc.configuration.NodeName) + procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, allowedPublishers, nil) + go procNew.spawnWorker(proc.processes, proc.natsConn) + default: er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0]) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) @@ -243,7 +263,7 @@ func (m methodREQOpCommand) handler(proc process, message Message, node string) newReplyMessage(proc, message, REQTextToLogFile, out) }() - ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", node, message.ID)) + ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", proc.node, message.ID)) return ackMsg, nil } diff --git a/var/errorLog/central/errorCentral.REQErrorLog b/var/errorLog/central/errorCentral.REQErrorLog index b8a79bb..8349359 100644 --- a/var/errorLog/central/errorCentral.REQErrorLog +++ b/var/errorLog/central/errorCentral.REQErrorLog @@ -1,4 +1,4 @@ -2021-04-07 06:11:54.599555 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout -2021-04-07 06:11:59.601342 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout -2021-04-07 06:12:04.603312 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout -2021-04-07 06:12:04.603423 +0000 UTC, info: max retries for message reached, breaking out: {ship2 1 [http://vg.no] REQHttpGet central 5 3 0 0 5 metrics/network/sniffer .html 0xc0006dc000} +2021-04-07 14:14:13.750212 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout +2021-04-07 14:14:18.755501 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout +2021-04-07 14:14:23.760201 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout +2021-04-07 14:14:23.760319 +0000 UTC, info: max retries for message reached, breaking out: {ship2 5 [http://vg.no] REQHttpGet central 5 3 0 0 5 metrics/network/sniffer .html 0xc00042e240}