1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

cleaned up errorKernel references

This commit is contained in:
postmannen 2022-04-01 08:43:14 +02:00
parent f0c84f48db
commit 56b432f402
4 changed files with 116 additions and 114 deletions

View file

@ -103,14 +103,14 @@ func (s *server) readStartupFolder() {
mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method)
if !ok { if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
p.processes.errorKernel.errSend(p, sams[i].Message, er) p.errorKernel.errSend(p, sams[i].Message, er)
continue continue
} }
_, err = mh.handler(p, sams[i].Message, s.nodeName) _, err = mh.handler(p, sams[i].Message, s.nodeName)
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
p.processes.errorKernel.errSend(p, sams[i].Message, er) p.errorKernel.errSend(p, sams[i].Message, er)
continue continue
} }
} }
@ -164,7 +164,7 @@ func (s *server) readSocket() {
conn, err := s.StewardSocket.Accept() conn, err := s.StewardSocket.Accept()
if err != nil { if err != nil {
er := fmt.Errorf("error: failed to accept conn on socket: %v", err) er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
s.processes.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
} }
go func(conn net.Conn) { go func(conn net.Conn) {
@ -177,7 +177,7 @@ func (s *server) readSocket() {
_, err = conn.Read(b) _, err = conn.Read(b)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
s.processes.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
return return
} }
@ -194,7 +194,7 @@ func (s *server) readSocket() {
sams, err := s.convertBytesToSAMs(readBytes) sams, err := s.convertBytesToSAMs(readBytes)
if err != nil { if err != nil {
er := fmt.Errorf("error: malformed json received on socket: %v", err) er := fmt.Errorf("error: malformed json received on socket: %v", err)
s.processes.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
return return
} }
@ -207,7 +207,7 @@ func (s *server) readSocket() {
// Send an info message to the central about the message picked // Send an info message to the central about the message picked
// for auditing. // for auditing.
er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message) er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message)
s.processes.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
} }
// Send the SAM struct to be picked up by the ring buffer. // Send the SAM struct to be picked up by the ring buffer.
@ -233,7 +233,7 @@ func (s *server) readTCPListener() {
conn, err := ln.Accept() conn, err := ln.Accept()
if err != nil { if err != nil {
er := fmt.Errorf("error: failed to accept conn on socket: %v", err) er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
s.processes.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
continue continue
} }
@ -247,7 +247,7 @@ func (s *server) readTCPListener() {
_, err = conn.Read(b) _, err = conn.Read(b)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
s.processes.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
return return
} }
@ -264,7 +264,7 @@ func (s *server) readTCPListener() {
sam, err := s.convertBytesToSAMs(readBytes) sam, err := s.convertBytesToSAMs(readBytes)
if err != nil { if err != nil {
er := fmt.Errorf("error: malformed json received on tcp listener: %v", err) er := fmt.Errorf("error: malformed json received on tcp listener: %v", err)
s.processes.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
return return
} }
@ -291,7 +291,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
_, err := r.Body.Read(b) _, err := r.Body.Read(b)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
s.processes.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
return return
} }
@ -308,7 +308,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
sam, err := s.convertBytesToSAMs(readBytes) sam, err := s.convertBytesToSAMs(readBytes)
if err != nil { if err != nil {
er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err) er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err)
s.processes.errorKernel.errSend(s.processInitial, Message{}, er) s.errorKernel.errSend(s.processInitial, Message{}, er)
return return
} }
@ -376,7 +376,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) {
sm, err := newSubjectAndMessage(m) sm, err := newSubjectAndMessage(m)
if err != nil { if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage: %v", err) er := fmt.Errorf("error: newSubjectAndMessage: %v", err)
s.processes.errorKernel.errSend(s.processInitial, m, er) s.errorKernel.errSend(s.processInitial, m, er)
continue continue
} }
@ -420,7 +420,7 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message {
// the slice since it is not valid. // the slice since it is not valid.
default: default:
er := fmt.Errorf("error: no toNode or toNodes where specified in the message, dropping message: %v", v) er := fmt.Errorf("error: no toNode or toNodes where specified in the message, dropping message: %v", v)
s.processes.errorKernel.errSend(s.processInitial, v, er) s.errorKernel.errSend(s.processInitial, v, er)
continue continue
} }
} }

View file

@ -99,9 +99,10 @@ type process struct {
// startup holds the startup functions for starting up publisher // startup holds the startup functions for starting up publisher
// or subscriber processes // or subscriber processes
startup *startup startup *startup
// Signatures // Signatures
signatures *signatures signatures *signatures
// errorKernel
errorKernel *errorKernel
} }
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
@ -130,6 +131,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
ctxCancel: cancel, ctxCancel: cancel,
startup: newStartup(server), startup: newStartup(server),
signatures: server.signatures, signatures: server.signatures,
errorKernel: server.errorKernel,
} }
return proc return proc
@ -175,7 +177,7 @@ func (p process) spawnWorker() {
err := p.procFunc(p.ctx, p.procFuncCh) err := p.procFunc(p.ctx, p.procFuncCh)
if err != nil { if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
p.processes.errorKernel.errSend(p, Message{}, er) p.errorKernel.errSend(p, Message{}, er)
} }
}() }()
} }
@ -198,7 +200,7 @@ func (p process) spawnWorker() {
err := p.procFunc(p.ctx, p.procFuncCh) err := p.procFunc(p.ctx, p.procFuncCh)
if err != nil { if err != nil {
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
p.processes.errorKernel.errSend(p, Message{}, er) p.errorKernel.errSend(p, Message{}, er)
} }
}() }()
} }
@ -285,7 +287,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
if err != nil { if err != nil {
er := fmt.Errorf("error: ack receive failed: subject=%v: %v", p.subject.name(), err) er := fmt.Errorf("error: ack receive failed: subject=%v: %v", p.subject.name(), err)
// sendErrorLogMessage(p.toRingbufferCh, p.node, er) // sendErrorLogMessage(p.toRingbufferCh, p.node, er)
p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
if err == nats.ErrNoResponders { if err == nats.ErrNoResponders {
// fmt.Printf(" * DEBUG: Waiting, ACKTimeout: %v\n", message.ACKTimeout) // fmt.Printf(" * DEBUG: Waiting, ACKTimeout: %v\n", message.ACKTimeout)
@ -295,7 +297,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// did not receive a reply, decide what to do.. // did not receive a reply, decide what to do..
retryAttempts++ retryAttempts++
er = fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID) er = fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID)
p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
switch { switch {
//case message.Retries == 0: //case message.Retries == 0:
@ -308,7 +310,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// We do not want to send errorLogs for REQErrorLog type since // We do not want to send errorLogs for REQErrorLog type since
// it will just cause an endless loop. // it will just cause an endless loop.
if message.Method != REQErrorLog { if message.Method != REQErrorLog {
p.processes.errorKernel.infoSend(p, message, er) p.errorKernel.infoSend(p, message, er)
} }
subReply.Unsubscribe() subReply.Unsubscribe()
@ -319,7 +321,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
default: default:
// none of the above matched, so we've not reached max retries yet // none of the above matched, so we've not reached max retries yet
er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID) er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID)
p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
p.server.metrics.promNatsMessagesMissedACKsTotal.Inc() p.server.metrics.promNatsMessagesMissedACKsTotal.Inc()
@ -360,7 +362,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
// If debugging is enabled, print the source node name of the nats messages received. // If debugging is enabled, print the source node name of the nats messages received.
if val, ok := msg.Header["fromNode"]; ok { if val, ok := msg.Header["fromNode"]; ok {
er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject) er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject)
p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
} }
// If compression is used, decompress it to get the gob data. If // If compression is used, decompress it to get the gob data. If
@ -372,13 +374,13 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
zr, err := zstd.NewReader(nil) zr, err := zstd.NewReader(nil)
if err != nil { if err != nil {
er := fmt.Errorf("error: zstd NewReader failed: %v", err) er := fmt.Errorf("error: zstd NewReader failed: %v", err)
p.processes.errorKernel.errSend(p, Message{}, er) p.errorKernel.errSend(p, Message{}, er)
return return
} }
msgData, err = zr.DecodeAll(msg.Data, nil) msgData, err = zr.DecodeAll(msg.Data, nil)
if err != nil { if err != nil {
er := fmt.Errorf("error: zstd decoding failed: %v", err) er := fmt.Errorf("error: zstd decoding failed: %v", err)
p.processes.errorKernel.errSend(p, Message{}, er) p.errorKernel.errSend(p, Message{}, er)
zr.Close() zr.Close()
return return
} }
@ -390,14 +392,14 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
gr, err := gzip.NewReader(r) gr, err := gzip.NewReader(r)
if err != nil { if err != nil {
er := fmt.Errorf("error: gzip NewReader failed: %v", err) er := fmt.Errorf("error: gzip NewReader failed: %v", err)
p.processes.errorKernel.errSend(p, Message{}, er) p.errorKernel.errSend(p, Message{}, er)
return return
} }
b, err := io.ReadAll(gr) b, err := io.ReadAll(gr)
if err != nil { if err != nil {
er := fmt.Errorf("error: gzip ReadAll failed: %v", err) er := fmt.Errorf("error: gzip ReadAll failed: %v", err)
p.processes.errorKernel.errSend(p, Message{}, er) p.errorKernel.errSend(p, Message{}, er)
return return
} }
@ -418,7 +420,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
err := cbor.Unmarshal(msgData, &message) err := cbor.Unmarshal(msgData, &message)
if err != nil { if err != nil {
er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.processes.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
return return
} }
default: // Deaults to gob if no match was found. default: // Deaults to gob if no match was found.
@ -428,7 +430,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
err := gobDec.Decode(&message) err := gobDec.Decode(&message)
if err != nil { if err != nil {
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.processes.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
return return
} }
} }
@ -441,7 +443,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
err := gobDec.Decode(&message) err := gobDec.Decode(&message)
if err != nil { if err != nil {
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.processes.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
return return
} }
} }
@ -464,7 +466,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
switch { switch {
case msgCopy.PreviousMessage.RelayReplyMethod == "": case msgCopy.PreviousMessage.RelayReplyMethod == "":
er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previous message: %v ", msgCopy) er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previous message: %v ", msgCopy)
p.processes.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod
@ -481,7 +483,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
sam, err := newSubjectAndMessage(msgCopy) sam, err := newSubjectAndMessage(msgCopy)
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandler: newSubjectAndMessage : %v, message copy: %v", err, msgCopy) er := fmt.Errorf("error: subscriberHandler: newSubjectAndMessage : %v, message copy: %v", err, msgCopy)
p.processes.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
} }
p.toRingbufferCh <- []subjectAndMessage{sam} p.toRingbufferCh <- []subjectAndMessage{sam}
@ -509,7 +511,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
mh, ok := p.methodsAvailable.CheckIfExists(message.Method) mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event) er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event)
p.processes.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
} }
out := []byte{} out := []byte{}
@ -520,7 +522,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
out, err = mh.handler(p, message, thisNode) out, err = mh.handler(p, message, thisNode)
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
p.processes.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
} }
} }
@ -532,7 +534,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
mf, ok := p.methodsAvailable.CheckIfExists(message.Method) mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
p.processes.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
} }
if p.signatures.verifySignature(message) { if p.signatures.verifySignature(message) {
@ -541,13 +543,13 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
p.processes.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
} }
} }
default: default:
er := fmt.Errorf("info: did not find that specific type of event: %#v", p.subject.Event) er := fmt.Errorf("info: did not find that specific type of event: %#v", p.subject.Event)
p.processes.errorKernel.infoSend(p, message, er) p.errorKernel.infoSend(p, message, er)
} }
} }
@ -642,7 +644,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
b, err := cbor.Marshal(m) b, err := cbor.Marshal(m)
if err != nil { if err != nil {
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
p.processes.errorKernel.errSend(p, m, er) p.errorKernel.errSend(p, m, er)
return return
} }
@ -655,7 +657,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
err := gobEnc.Encode(m) err := gobEnc.Encode(m)
if err != nil { if err != nil {
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
p.processes.errorKernel.errSend(p, m, er) p.errorKernel.errSend(p, m, er)
return return
} }
@ -711,7 +713,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
// We only wan't to send the error message to errorCentral once. // We only wan't to send the error message to errorCentral once.
once.Do(func() { once.Do(func() {
p.processes.errorKernel.errSend(p, m, er) p.errorKernel.errSend(p, m, er)
}) })
// No compression, so we just assign the value of the serialized // No compression, so we just assign the value of the serialized

View file

@ -274,7 +274,7 @@ func (s startup) pubREQHello(p process) {
sam, err := newSubjectAndMessage(m) sam, err := newSubjectAndMessage(m)
if err != nil { if err != nil {
// In theory the system should drop the message before it reaches here. // In theory the system should drop the message before it reaches here.
p.processes.errorKernel.errSend(p, m, err) p.errorKernel.errSend(p, m, err)
log.Printf("error: ProcessesStart: %v\n", err) log.Printf("error: ProcessesStart: %v\n", err)
} }
proc.toRingbufferCh <- []subjectAndMessage{sam} proc.toRingbufferCh <- []subjectAndMessage{sam}

View file

@ -354,7 +354,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
if err != nil { if err != nil {
// In theory the system should drop the message before it reaches here. // In theory the system should drop the message before it reaches here.
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
proc.toRingbufferCh <- []subjectAndMessage{sam} proc.toRingbufferCh <- []subjectAndMessage{sam}
@ -458,7 +458,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
switch { switch {
case len(message.MethodArgs) < 1: case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQOpProcessStart: got <1 number methodArgs") er := fmt.Errorf("error: methodREQOpProcessStart: got <1 number methodArgs")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -467,7 +467,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
tmpH := mt.getHandler(Method(method)) tmpH := mt.getHandler(Method(method))
if tmpH == nil { if tmpH == nil {
er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m) er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -478,7 +478,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
er := fmt.Errorf(txt) er := fmt.Errorf(txt)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
out = []byte(txt + "\n") out = []byte(txt + "\n")
newReplyMessage(proc, message, out) newReplyMessage(proc, message, out)
@ -523,7 +523,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
if v := len(message.MethodArgs); v != 3 { if v := len(message.MethodArgs); v != 3 {
er := fmt.Errorf("error: methodREQOpProcessStop: got <4 number methodArgs, want: method,node,kind") er := fmt.Errorf("error: methodREQOpProcessStop: got <4 number methodArgs, want: method,node,kind")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
methodString := message.MethodArgs[0] methodString := message.MethodArgs[0]
@ -534,7 +534,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
tmpH := mt.getHandler(Method(method)) tmpH := mt.getHandler(Method(method))
if tmpH == nil { if tmpH == nil {
er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString) er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -560,7 +560,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
err := toStopProc.natsSubscription.Unsubscribe() err := toStopProc.natsSubscription.Unsubscribe()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
// Remove the prometheus label // Remove the prometheus label
@ -568,7 +568,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode) txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode)
er := fmt.Errorf(txt) er := fmt.Errorf(txt)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
out = []byte(txt + "\n") out = []byte(txt + "\n")
newReplyMessage(proc, message, out) newReplyMessage(proc, message, out)
@ -576,7 +576,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
} else { } else {
txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode) txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode)
er := fmt.Errorf(txt) er := fmt.Errorf(txt)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
out = []byte(txt + "\n") out = []byte(txt + "\n")
newReplyMessage(proc, message, out) newReplyMessage(proc, message, out)
@ -613,11 +613,11 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
err := os.MkdirAll(folderTree, 0700) err := os.MkdirAll(folderTree, 0700)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err) er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
} }
// Open file and write data. // Open file and write data.
@ -625,7 +625,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err) er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return nil, err return nil, err
} }
defer f.Close() defer f.Close()
@ -634,7 +634,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err) er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
@ -664,13 +664,13 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
err := os.MkdirAll(folderTree, 0700) err := os.MkdirAll(folderTree, 0700)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return nil, er return nil, er
} }
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
} }
// Open file and write data. // Open file and write data.
@ -678,7 +678,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return nil, err return nil, err
} }
@ -688,7 +688,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err) er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
@ -716,7 +716,7 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
switch { switch {
case len(message.MethodArgs) < 3: case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQCopyFileFrom: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") er := fmt.Errorf("error: methodREQCopyFileFrom: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -743,11 +743,11 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
select { select {
case <-ctx.Done(): case <-ctx.Done():
er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs) er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case er := <-errCh: case er := <-errCh:
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case out := <-outCh: case out := <-outCh:
@ -771,7 +771,7 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
sam, err := newSubjectAndMessage(msg) sam, err := newSubjectAndMessage(msg)
if err != nil { if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
proc.toRingbufferCh <- []subjectAndMessage{sam} proc.toRingbufferCh <- []subjectAndMessage{sam}
@ -868,7 +868,7 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string)
switch { switch {
case len(message.MethodArgs) < 3: case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQCopyFileTo: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") er := fmt.Errorf("error: methodREQCopyFileTo: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -892,7 +892,7 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string)
{ {
er := fmt.Errorf("info: MethodREQCopyFileTo: Creating folders %v", dstDir) er := fmt.Errorf("info: MethodREQCopyFileTo: Creating folders %v", dstDir)
proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
} }
} }
@ -924,12 +924,12 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string)
select { select {
case <-ctx.Done(): case <-ctx.Done():
er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs) er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case err := <-errCh: case err := <-errCh:
er := fmt.Errorf("error: methodREQCopyFileTo: %v", err) er := fmt.Errorf("error: methodREQCopyFileTo: %v", err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case out := <-outCh: case out := <-outCh:
@ -968,7 +968,7 @@ func (m methodREQHello) handler(proc process, message Message, node string) ([]b
} }
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
} }
// Open file and write data. // Open file and write data.
@ -986,7 +986,7 @@ func (m methodREQHello) handler(proc process, message Message, node string) ([]b
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err) er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
// -------------------------- // --------------------------
@ -1025,7 +1025,7 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) (
} }
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
} }
// Open file and write data. // Open file and write data.
@ -1041,7 +1041,7 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) (
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err) er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
@ -1071,13 +1071,13 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
err := os.MkdirAll(folderTree, 0700) err := os.MkdirAll(folderTree, 0700)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree: %v, %v", folderTree, err) er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree: %v, %v", folderTree, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return nil, er return nil, er
} }
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
} }
// Open file. // Open file.
@ -1085,7 +1085,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return nil, err return nil, err
} }
@ -1097,7 +1097,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
proc.processes.wg.Add(1) proc.processes.wg.Add(1)
@ -1134,13 +1134,13 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
err := os.MkdirAll(folderTree, 0700) err := os.MkdirAll(folderTree, 0700)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v", folderTree, err) er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v", folderTree, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return nil, er return nil, er
} }
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
} }
// Open file. // Open file.
@ -1148,7 +1148,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return nil, err return nil, err
} }
@ -1160,7 +1160,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
f.Sync() f.Sync()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
@ -1182,7 +1182,7 @@ func (m methodREQCliCommand) getKind() Event {
// as a new message. // as a new message.
func (m methodREQCliCommand) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQCliCommand) handler(proc process, message Message, node string) ([]byte, error) {
inf := fmt.Errorf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.MethodArgs) inf := fmt.Errorf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.MethodArgs)
proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
// Execute the CLI command in it's own go routine, so we are able // Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the messag was // to return immediately with an ack reply that the messag was
@ -1197,7 +1197,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
switch { switch {
case len(message.MethodArgs) < 1: case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs") er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case len(message.MethodArgs) >= 0: case len(message.MethodArgs) >= 0:
@ -1247,7 +1247,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
err := cmd.Run() err := cmd.Run()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String()) er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String())
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
select { select {
@ -1261,7 +1261,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("error: methodREQCliCommand: method timed out: %v", message.MethodArgs) er := fmt.Errorf("error: methodREQCliCommand: method timed out: %v", message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
case out := <-outCh: case out := <-outCh:
cancel() cancel()
@ -1305,7 +1305,7 @@ func (m methodREQToConsole) handler(proc process, message Message, node string)
proc.processes.tui.toConsoleCh <- message.Data proc.processes.tui.toConsoleCh <- message.Data
} else { } else {
er := fmt.Errorf("error: no tui client started") er := fmt.Errorf("error: no tui client started")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
default: default:
fmt.Fprintf(os.Stdout, "%v", string(message.Data)) fmt.Fprintf(os.Stdout, "%v", string(message.Data))
@ -1334,7 +1334,7 @@ func (m methodREQTuiToConsole) handler(proc process, message Message, node strin
proc.processes.tui.toConsoleCh <- message.Data proc.processes.tui.toConsoleCh <- message.Data
} else { } else {
er := fmt.Errorf("error: no tui client started") er := fmt.Errorf("error: no tui client started")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
@ -1354,7 +1354,7 @@ func (m methodREQHttpGet) getKind() Event {
// handler to do a Http Get. // handler to do a Http Get.
func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) {
inf := fmt.Errorf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data) inf := fmt.Errorf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
proc.processes.wg.Add(1) proc.processes.wg.Add(1)
go func() { go func() {
@ -1363,7 +1363,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
switch { switch {
case len(message.MethodArgs) < 1: case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQHttpGet: got <1 number methodArgs") er := fmt.Errorf("error: methodREQHttpGet: got <1 number methodArgs")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -1380,7 +1380,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
cancel() cancel()
return return
} }
@ -1394,7 +1394,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
defer resp.Body.Close() defer resp.Body.Close()
@ -1402,14 +1402,14 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
cancel() cancel()
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, bailing out: %v", resp.StatusCode, message) er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, bailing out: %v", resp.StatusCode, message)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
out := body out := body
@ -1425,7 +1425,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
case out := <-outCh: case out := <-outCh:
cancel() cancel()
@ -1454,7 +1454,7 @@ func (m methodREQHttpGetScheduled) getKind() Event {
// The second element of the MethodArgs slice holds the timer defined in seconds. // The second element of the MethodArgs slice holds the timer defined in seconds.
func (m methodREQHttpGetScheduled) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQHttpGetScheduled) handler(proc process, message Message, node string) ([]byte, error) {
inf := fmt.Errorf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data) inf := fmt.Errorf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
proc.processes.wg.Add(1) proc.processes.wg.Add(1)
go func() { go func() {
@ -1465,7 +1465,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
switch { switch {
case len(message.MethodArgs) < 3: case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for") er := fmt.Errorf("error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -1475,14 +1475,14 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
scheduleInterval, err := strconv.Atoi(message.MethodArgs[1]) scheduleInterval, err := strconv.Atoi(message.MethodArgs[1])
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
schedulerTotalTime, err := strconv.Atoi(message.MethodArgs[2]) schedulerTotalTime, err := strconv.Atoi(message.MethodArgs[2])
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -1516,7 +1516,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, error: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, error: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
cancel() cancel()
return return
} }
@ -1529,7 +1529,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, error: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, error: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
defer resp.Body.Close() defer resp.Body.Close()
@ -1537,14 +1537,14 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
cancel() cancel()
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, error: %v", resp.StatusCode, message) er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, error: %v", resp.StatusCode, message)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
out := body out := body
@ -1575,7 +1575,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
// fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n") // fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n")
cancel() cancel()
er := fmt.Errorf("error: methodREQHttpGet: schedule context timed out: %v", message.MethodArgs) er := fmt.Errorf("error: methodREQHttpGet: schedule context timed out: %v", message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case out := <-outCh: case out := <-outCh:
// Prepare and queue for sending a new message with the output // Prepare and queue for sending a new message with the output
@ -1605,7 +1605,7 @@ func (m methodREQTailFile) getKind() Event {
// as a new message. // as a new message.
func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) {
inf := fmt.Errorf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data) inf := fmt.Errorf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
proc.processes.wg.Add(1) proc.processes.wg.Add(1)
go func() { go func() {
@ -1614,7 +1614,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
switch { switch {
case len(message.MethodArgs) < 1: case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQTailFile: got <1 number methodArgs") er := fmt.Errorf("error: methodREQTailFile: got <1 number methodArgs")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -1641,7 +1641,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
}}) }})
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err) er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
proc.processes.wg.Add(1) proc.processes.wg.Add(1)
@ -1667,7 +1667,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
// go routine. // go routine.
// close(t.Lines) // close(t.Lines)
er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs) er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs)
proc.processes.errorKernel.infoSend(proc, message, er) proc.errorKernel.infoSend(proc, message, er)
return return
case out := <-outCh: case out := <-outCh:
@ -1699,7 +1699,7 @@ func (m methodREQCliCommandCont) getKind() Event {
// back as it is generated, and not just when the command is finished. // back as it is generated, and not just when the command is finished.
func (m methodREQCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) { func (m methodREQCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) {
inf := fmt.Errorf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data) inf := fmt.Errorf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
// Execute the CLI command in it's own go routine, so we are able // Execute the CLI command in it's own go routine, so we are able
// to return immediately with an ack reply that the message was // to return immediately with an ack reply that the message was
@ -1718,7 +1718,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
switch { switch {
case len(message.MethodArgs) < 1: case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs") er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case len(message.MethodArgs) >= 0: case len(message.MethodArgs) >= 0:
@ -1746,18 +1746,18 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
outReader, err := cmd.StdoutPipe() outReader, err := cmd.StdoutPipe()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
ErrorReader, err := cmd.StderrPipe() ErrorReader, err := cmd.StderrPipe()
if err != nil { if err != nil {
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v", err, message.MethodArgs) er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
go func() { go func() {
@ -1787,7 +1787,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceled: methodArgs: %v, %v", message.MethodArgs, err) er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceled: methodArgs: %v, %v", message.MethodArgs, err)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
}() }()
@ -1798,7 +1798,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
case <-ctx.Done(): case <-ctx.Done():
cancel() cancel()
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs) er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs)
proc.processes.errorKernel.infoSend(proc, message, er) proc.errorKernel.infoSend(proc, message, er)
return return
case out := <-outCh: case out := <-outCh:
// fmt.Printf(" * out: %v\n", string(out)) // fmt.Printf(" * out: %v\n", string(out))
@ -1870,7 +1870,7 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin
switch { switch {
case len(message.MethodArgs) < 3: case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQRelayInitial: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") er := fmt.Errorf("error: methodREQRelayInitial: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }
@ -1903,11 +1903,11 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin
select { select {
case <-ctx.Done(): case <-ctx.Done():
er := fmt.Errorf("error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v", message.MethodArgs) er := fmt.Errorf("error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v", message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case er := <-errCh: case er := <-errCh:
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case <-nothingCh: case <-nothingCh:
@ -1928,7 +1928,7 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin
sam, err := newSubjectAndMessage(message) sam, err := newSubjectAndMessage(message)
if err != nil { if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
} }
proc.toRingbufferCh <- []subjectAndMessage{sam} proc.toRingbufferCh <- []subjectAndMessage{sam}
@ -1964,7 +1964,7 @@ func (m methodREQRelay) handler(proc process, message Message, node string) ([]b
sam, err := newSubjectAndMessage(message) sam, err := newSubjectAndMessage(message)
if err != nil { if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.processes.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
} }