1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

op startProc initially seems to work

This commit is contained in:
postmannen 2021-04-08 12:51:54 +02:00
parent e2f56e4427
commit ab49f5a4de
15 changed files with 141 additions and 28 deletions

View file

@ -2,7 +2,7 @@
{
"directory": "metrics/network/sniffer",
"fileExtension": ".html",
"toNode": "ship2",
"toNode": "ship1",
"data": ["http://vg.no"],
"method":"REQHttpGet",
"timeout":5,

View file

@ -2,7 +2,7 @@
{
"directory":"opcommand_logs",
"fileExtension": ".log",
"toNode": "ship2",
"toNode": "ship1",
"data": ["ps"],
"method":"REQOpCommand",
"timeout":3,

View file

@ -0,0 +1,14 @@
[
{
"directory":"opcommand_logs",
"fileExtension": ".log",
"toNode": "ship1",
"data": ["startProc","REQHttpGet","central"],
"method":"REQOpCommand",
"timeout":3,
"retries":3,
"requestTimeout":3,
"requestRetries":3,
"MethodTimeout": 7
}
]

View file

@ -0,0 +1,14 @@
[
{
"directory":"opcommand_logs",
"fileExtension": ".log",
"toNode": "ship1",
"data": ["stopProc","REQHello","publisher","central"],
"method":"REQOpCommand",
"timeout":3,
"retries":3,
"requestTimeout":3,
"requestRetries":3,
"MethodTimeout": 7
}
]

View file

@ -0,0 +1,14 @@
[
{
"directory":"opcommand_logs",
"fileExtension": ".log",
"toNode": "ship1",
"data": ["stopProc","REQHttpGet","subscriber"],
"method":"REQOpCommand",
"timeout":3,
"retries":3,
"requestTimeout":3,
"requestRetries":3,
"MethodTimeout": 7
}
]

View file

@ -2,7 +2,7 @@
{
"directory": "ping",
"fileExtension":".ping.log",
"toNode": "ship2",
"toNode": "ship1",
"data": [""],
"method":"REQPing",
"timeout":3,

View file

@ -0,0 +1,12 @@
[
{
"directory": "metrics/network/sniffer",
"fileExtension": ".html",
"toNode": "ship2",
"data": ["http://erter.org"],
"method":"REQHttpGet",
"timeout":5,
"retries":3,
"methodTimeout": 5
}
]

View file

@ -0,0 +1,14 @@
[
{
"directory":"opcommand_logs",
"fileExtension": ".log",
"toNode": "ship2",
"data": ["ps"],
"method":"REQOpCommand",
"timeout":3,
"retries":3,
"requestTimeout":3,
"requestRetries":3,
"MethodTimeout": 7
}
]

View file

@ -0,0 +1,14 @@
[
{
"directory":"opcommand_logs",
"fileExtension": ".log",
"toNode": "ship2",
"data": ["stopProc","REQHello","publisher"],
"method":"REQOpCommand",
"timeout":3,
"retries":3,
"requestTimeout":3,
"requestRetries":3,
"MethodTimeout": 7
}
]

View file

@ -62,7 +62,7 @@ type process struct {
// nats connection
natsConn *nats.Conn
// natsSubscription returned when calling natsConn.Subscribe
natsSubscription string
natsSubscription *nats.Subscription
// context
ctx context.Context
// context cancelFunc
@ -149,13 +149,6 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
pn = processNameGet(p.subject.name(), processKindSubscriber)
}
p.processName = pn
// Add information about the new process to the started processes map.
procs.mu.Lock()
procs.active[pn] = p
procs.mu.Unlock()
// Start a publisher worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns.
if p.processKind == processKindPublisher {
@ -174,7 +167,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
}()
}
go p.publishMessages(natsConn, procs)
go p.publishMessages(natsConn)
}
// Start a subscriber worker, which will start a go routine (process)
@ -194,8 +187,15 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
}()
}
p.subscribeMessages()
p.natsSubscription = p.subscribeMessages()
}
p.processName = pn
// Add information about the new process to the started processes map.
procs.mu.Lock()
procs.active[pn] = p
procs.mu.Unlock()
}
// messageDeliverNats will take care of the delivering the message
@ -375,10 +375,10 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
// Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received.
func (p process) subscribeMessages() {
func (p process) subscribeMessages() *nats.Subscription {
subject := string(p.subject.name())
//natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
//_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// We start one handler per message received by using go routines here.
// This is for being able to reply back the current publisher who sent
@ -387,12 +387,15 @@ func (p process) subscribeMessages() {
})
if err != nil {
log.Printf("error: Subscribe failed: %v\n", err)
return nil
}
return natsSubscription
}
// publishMessages will do the publishing of messages for one single
// process.
func (p process) publishMessages(natsConn *nats.Conn, processes *processes) {
func (p process) publishMessages(natsConn *nats.Conn) {
for {
var err error
var m Message
@ -410,7 +413,7 @@ func (p process) publishMessages(natsConn *nats.Conn, processes *processes) {
// Get the process name so we can look up the process in the
// processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher)
m.ID = processes.active[pn].messageID
m.ID = p.messageID
p.messageDeliverNats(natsConn, m)
@ -420,9 +423,9 @@ func (p process) publishMessages(natsConn *nats.Conn, processes *processes) {
// Increment the counter for the next message to be sent.
p.messageID++
processes.mu.Lock()
processes.active[pn] = p
processes.mu.Unlock()
p.processes.mu.Lock()
p.processes.active[pn] = p
p.processes.mu.Unlock()
// Handle the error.
//

View file

@ -184,6 +184,7 @@ func (s *server) ProcessesStart() {
select {
case <-ticker.C:
case <-ctx.Done():
fmt.Printf(" ** DEBUG: got <- ctx.Done\n")
er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name())
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
return nil

View file

@ -253,8 +253,18 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
go procNew.spawnWorker(proc.processes, proc.natsConn)
case message.Data[0] == "stopProc":
// Data layout: OPCommand, Method, publisher/subscriber
if len(message.Data) < 3 {
fmt.Printf(" ** DEBUG 0: got stopProc\n")
// Data layout: OPCommand, Method, publisher/subscriber, receivingNode
//
// The processes can be either publishers or subscribers. The subject name
// are used within naming a process. Since the subject structure contains
// the node name of the node that will receive this message we also need
// specify it so we are able to delete the publisher processes, since a
// publisher process will have the name of the node to receive the message,
// and not just the local node name as with subscriber processes.
// receive the message we need to specify
if len(message.Data) < 4 {
fmt.Printf(`error: DEBUG: stopProc: not enough data values. want "<OPCommand>", "<Method>", "<publisher/subscriber>","<receiving nodeName>": %v` + fmt.Sprint(message))
er := fmt.Errorf(`error: stopProc: not enough data values. want "<OPCommand>", "<Method>", "<publisher/subscriber>": %v` + fmt.Sprint(message))
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
return
@ -267,23 +277,42 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
toStopMethod := Method(message.Data[1])
pubOrSub := processKind(message.Data[2])
recevingNode := processKind(message.Data[3])
// ..check if valid
sub := newSubject(toStopMethod, proc.configuration.NodeName)
sub := newSubject(toStopMethod, string(recevingNode))
processName := processNameGet(sub.name(), pubOrSub)
// ..check if valid
fmt.Printf(" ** DEBUG1: processName: %v\n", processName)
fmt.Printf(" ** DEBUG1.1: Before mutex lock\n")
proc.processes.mu.Lock()
fmt.Printf(" ** DEBUG1.2: After mutex lock\n")
for k, v := range proc.processes.active {
fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind)
}
toStopProc, ok := proc.processes.active[processName]
fmt.Printf(" ** DEBUG2.1: toStopProc: %v\n", toStopProc)
if ok {
fmt.Printf(" ** DEBUG2.2: toStopProc: %v\n", toStopProc)
fmt.Printf(" ** STOP: processName: %v\n", processName)
fmt.Printf(" ** STOP: toStopProc: %v\n", toStopProc)
// Delete the process from the processes map
delete(proc.processes.active, processName)
// Stop started go routines that belong to the process.
toStopProc.ctxCancel()
// Stop subscribing for messages on the process's subject.
err := toStopProc.natsSubscription.Unsubscribe()
if err != nil {
log.Printf(" ** Error: failed to stop *nats.Subscription: %v\n", err)
}
}
proc.processes.mu.Unlock()
default:
fmt.Printf("error: no such OpCommand specified: " + message.Data[0])
er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0])
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
return

View file

@ -1,4 +1,2 @@
2021-04-07 14:14:13.750212 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout
2021-04-07 14:14:18.755501 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout
2021-04-07 14:14:23.760201 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout
2021-04-07 14:14:23.760319 +0000 UTC, info: max retries for message reached, breaking out: {ship2 5 [http://vg.no] REQHttpGet central 5 3 0 0 5 metrics/network/sniffer .html <nil> 0xc00042e240}
2021-04-08 10:18:45.748974 +0000 UTC, info: canceling publisher: central.REQHello.EventNACK
2021-04-08 10:18:46.252822 +0000 UTC, error: subscriberHandler: failed to execute event: open var/errorLog/central/errorCentral.REQErrorLog: permission denied