diff --git a/message_readers.go b/message_readers.go index 3d13e65..b11cb9b 100644 --- a/message_readers.go +++ b/message_readers.go @@ -103,14 +103,14 @@ func (s *server) readStartupFolder() { mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) - p.processes.errorKernel.errSend(p, sams[i].Message, er) + p.errorKernel.errSend(p, sams[i].Message, er) continue } _, err = mh.handler(p, sams[i].Message, s.nodeName) if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - p.processes.errorKernel.errSend(p, sams[i].Message, er) + p.errorKernel.errSend(p, sams[i].Message, er) continue } } @@ -164,7 +164,7 @@ func (s *server) readSocket() { conn, err := s.StewardSocket.Accept() if err != nil { er := fmt.Errorf("error: failed to accept conn on socket: %v", err) - s.processes.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er) } go func(conn net.Conn) { @@ -177,7 +177,7 @@ func (s *server) readSocket() { _, err = conn.Read(b) if err != nil && err != io.EOF { er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) - s.processes.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er) return } @@ -194,7 +194,7 @@ func (s *server) readSocket() { sams, err := s.convertBytesToSAMs(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on socket: %v", err) - s.processes.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er) return } @@ -207,7 +207,7 @@ func (s *server) readSocket() { // Send an info message to the central about the message picked // for auditing. er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message) - s.processes.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er) } // Send the SAM struct to be picked up by the ring buffer. @@ -233,7 +233,7 @@ func (s *server) readTCPListener() { conn, err := ln.Accept() if err != nil { er := fmt.Errorf("error: failed to accept conn on socket: %v", err) - s.processes.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er) continue } @@ -247,7 +247,7 @@ func (s *server) readTCPListener() { _, err = conn.Read(b) if err != nil && err != io.EOF { er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) - s.processes.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er) return } @@ -264,7 +264,7 @@ func (s *server) readTCPListener() { sam, err := s.convertBytesToSAMs(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on tcp listener: %v", err) - s.processes.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er) return } @@ -291,7 +291,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) _, err := r.Body.Read(b) if err != nil && err != io.EOF { er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) - s.processes.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er) return } @@ -308,7 +308,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) sam, err := s.convertBytesToSAMs(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err) - s.processes.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er) return } @@ -376,7 +376,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) { sm, err := newSubjectAndMessage(m) if err != nil { er := fmt.Errorf("error: newSubjectAndMessage: %v", err) - s.processes.errorKernel.errSend(s.processInitial, m, er) + s.errorKernel.errSend(s.processInitial, m, er) continue } @@ -420,7 +420,7 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message { // the slice since it is not valid. default: er := fmt.Errorf("error: no toNode or toNodes where specified in the message, dropping message: %v", v) - s.processes.errorKernel.errSend(s.processInitial, v, er) + s.errorKernel.errSend(s.processInitial, v, er) continue } } diff --git a/process.go b/process.go index a18c15f..c14fb03 100644 --- a/process.go +++ b/process.go @@ -99,9 +99,10 @@ type process struct { // startup holds the startup functions for starting up publisher // or subscriber processes startup *startup - // Signatures signatures *signatures + // errorKernel + errorKernel *errorKernel } // prepareNewProcess will set the the provided values and the default @@ -130,6 +131,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin ctxCancel: cancel, startup: newStartup(server), signatures: server.signatures, + errorKernel: server.errorKernel, } return proc @@ -175,7 +177,7 @@ func (p process) spawnWorker() { err := p.procFunc(p.ctx, p.procFuncCh) if err != nil { er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) - p.processes.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er) } }() } @@ -198,7 +200,7 @@ func (p process) spawnWorker() { err := p.procFunc(p.ctx, p.procFuncCh) if err != nil { er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) - p.processes.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er) } }() } @@ -285,7 +287,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He if err != nil { er := fmt.Errorf("error: ack receive failed: subject=%v: %v", p.subject.name(), err) // sendErrorLogMessage(p.toRingbufferCh, p.node, er) - p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) if err == nats.ErrNoResponders { // fmt.Printf(" * DEBUG: Waiting, ACKTimeout: %v\n", message.ACKTimeout) @@ -295,7 +297,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // did not receive a reply, decide what to do.. retryAttempts++ er = fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID) - p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) switch { //case message.Retries == 0: @@ -308,7 +310,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // We do not want to send errorLogs for REQErrorLog type since // it will just cause an endless loop. if message.Method != REQErrorLog { - p.processes.errorKernel.infoSend(p, message, er) + p.errorKernel.infoSend(p, message, er) } subReply.Unsubscribe() @@ -319,7 +321,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He default: // none of the above matched, so we've not reached max retries yet er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID) - p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.server.metrics.promNatsMessagesMissedACKsTotal.Inc() @@ -360,7 +362,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // If debugging is enabled, print the source node name of the nats messages received. if val, ok := msg.Header["fromNode"]; ok { er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject) - p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) } // If compression is used, decompress it to get the gob data. If @@ -372,13 +374,13 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, zr, err := zstd.NewReader(nil) if err != nil { er := fmt.Errorf("error: zstd NewReader failed: %v", err) - p.processes.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er) return } msgData, err = zr.DecodeAll(msg.Data, nil) if err != nil { er := fmt.Errorf("error: zstd decoding failed: %v", err) - p.processes.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er) zr.Close() return } @@ -390,14 +392,14 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, gr, err := gzip.NewReader(r) if err != nil { er := fmt.Errorf("error: gzip NewReader failed: %v", err) - p.processes.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er) return } b, err := io.ReadAll(gr) if err != nil { er := fmt.Errorf("error: gzip ReadAll failed: %v", err) - p.processes.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er) return } @@ -418,7 +420,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, err := cbor.Unmarshal(msgData, &message) if err != nil { er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.processes.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er) return } default: // Deaults to gob if no match was found. @@ -428,7 +430,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, err := gobDec.Decode(&message) if err != nil { er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.processes.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er) return } } @@ -441,7 +443,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, err := gobDec.Decode(&message) if err != nil { er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.processes.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er) return } } @@ -464,7 +466,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, switch { case msgCopy.PreviousMessage.RelayReplyMethod == "": er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previous message: %v ", msgCopy) - p.processes.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er) msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod @@ -481,7 +483,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, sam, err := newSubjectAndMessage(msgCopy) if err != nil { er := fmt.Errorf("error: subscriberHandler: newSubjectAndMessage : %v, message copy: %v", err, msgCopy) - p.processes.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er) } p.toRingbufferCh <- []subjectAndMessage{sam} @@ -509,7 +511,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, mh, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event) - p.processes.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er) } out := []byte{} @@ -520,7 +522,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, out, err = mh.handler(p, message, thisNode) if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - p.processes.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er) } } @@ -532,7 +534,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) - p.processes.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er) } if p.signatures.verifySignature(message) { @@ -541,13 +543,13 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - p.processes.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er) } } default: er := fmt.Errorf("info: did not find that specific type of event: %#v", p.subject.Event) - p.processes.errorKernel.infoSend(p, message, er) + p.errorKernel.infoSend(p, message, er) } } @@ -642,7 +644,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, b, err := cbor.Marshal(m) if err != nil { er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) - p.processes.errorKernel.errSend(p, m, er) + p.errorKernel.errSend(p, m, er) return } @@ -655,7 +657,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, err := gobEnc.Encode(m) if err != nil { er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) - p.processes.errorKernel.errSend(p, m, er) + p.errorKernel.errSend(p, m, er) return } @@ -711,7 +713,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, // We only wan't to send the error message to errorCentral once. once.Do(func() { - p.processes.errorKernel.errSend(p, m, er) + p.errorKernel.errSend(p, m, er) }) // No compression, so we just assign the value of the serialized diff --git a/processes.go b/processes.go index e3ff886..25ce721 100644 --- a/processes.go +++ b/processes.go @@ -274,7 +274,7 @@ func (s startup) pubREQHello(p process) { sam, err := newSubjectAndMessage(m) if err != nil { // In theory the system should drop the message before it reaches here. - p.processes.errorKernel.errSend(p, m, err) + p.errorKernel.errSend(p, m, err) log.Printf("error: ProcessesStart: %v\n", err) } proc.toRingbufferCh <- []subjectAndMessage{sam} diff --git a/requests.go b/requests.go index e14b9ef..c734212 100644 --- a/requests.go +++ b/requests.go @@ -354,7 +354,7 @@ func newReplyMessage(proc process, message Message, outData []byte) { if err != nil { // In theory the system should drop the message before it reaches here. er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -458,7 +458,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQOpProcessStart: got <1 number methodArgs") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -467,7 +467,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str tmpH := mt.getHandler(Method(method)) if tmpH == nil { er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -478,7 +478,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) er := fmt.Errorf(txt) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) out = []byte(txt + "\n") newReplyMessage(proc, message, out) @@ -523,7 +523,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri if v := len(message.MethodArgs); v != 3 { er := fmt.Errorf("error: methodREQOpProcessStop: got <4 number methodArgs, want: method,node,kind") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } methodString := message.MethodArgs[0] @@ -534,7 +534,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri tmpH := mt.getHandler(Method(method)) if tmpH == nil { er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -560,7 +560,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri err := toStopProc.natsSubscription.Unsubscribe() if err != nil { er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } // Remove the prometheus label @@ -568,7 +568,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode) er := fmt.Errorf(txt) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) out = []byte(txt + "\n") newReplyMessage(proc, message, out) @@ -576,7 +576,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri } else { txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode) er := fmt.Errorf(txt) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) out = []byte(txt + "\n") newReplyMessage(proc, message, out) @@ -613,11 +613,11 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin err := os.MkdirAll(folderTree, 0700) if err != nil { er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) - proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } // Open file and write data. @@ -625,7 +625,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) if err != nil { er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return nil, err } defer f.Close() @@ -634,7 +634,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -664,13 +664,13 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([] err := os.MkdirAll(folderTree, 0700) if err != nil { er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return nil, er } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) - proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } // Open file and write data. @@ -678,7 +678,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([] f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return nil, err } @@ -688,7 +688,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([] f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -716,7 +716,7 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin switch { case len(message.MethodArgs) < 3: er := fmt.Errorf("error: methodREQCopyFileFrom: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -743,11 +743,11 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin select { case <-ctx.Done(): er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return case er := <-errCh: - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return case out := <-outCh: @@ -771,7 +771,7 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -868,7 +868,7 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string) switch { case len(message.MethodArgs) < 3: er := fmt.Errorf("error: methodREQCopyFileTo: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -892,7 +892,7 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string) { er := fmt.Errorf("info: MethodREQCopyFileTo: Creating folders %v", dstDir) - proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } } @@ -924,12 +924,12 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string) select { case <-ctx.Done(): er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return case err := <-errCh: er := fmt.Errorf("error: methodREQCopyFileTo: %v", err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return case out := <-outCh: @@ -968,7 +968,7 @@ func (m methodREQHello) handler(proc process, message Message, node string) ([]b } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) - proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } // Open file and write data. @@ -986,7 +986,7 @@ func (m methodREQHello) handler(proc process, message Message, node string) ([]b f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } // -------------------------- @@ -1025,7 +1025,7 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) ( } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) - proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } // Open file and write data. @@ -1041,7 +1041,7 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) ( f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -1071,13 +1071,13 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by err := os.MkdirAll(folderTree, 0700) if err != nil { er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree: %v, %v", folderTree, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return nil, er } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) - proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } // Open file. @@ -1085,7 +1085,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return nil, err } @@ -1097,7 +1097,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by f.Sync() if err != nil { er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } proc.processes.wg.Add(1) @@ -1134,13 +1134,13 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by err := os.MkdirAll(folderTree, 0700) if err != nil { er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v", folderTree, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return nil, er } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) - proc.processes.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) } // Open file. @@ -1148,7 +1148,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return nil, err } @@ -1160,7 +1160,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by f.Sync() if err != nil { er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -1182,7 +1182,7 @@ func (m methodREQCliCommand) getKind() Event { // as a new message. func (m methodREQCliCommand) handler(proc process, message Message, node string) ([]byte, error) { inf := fmt.Errorf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.MethodArgs) - proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) // Execute the CLI command in it's own go routine, so we are able // to return immediately with an ack reply that the messag was @@ -1197,7 +1197,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return case len(message.MethodArgs) >= 0: @@ -1247,7 +1247,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) err := cmd.Run() if err != nil { er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String()) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } select { @@ -1261,7 +1261,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQCliCommand: method timed out: %v", message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) case out := <-outCh: cancel() @@ -1305,7 +1305,7 @@ func (m methodREQToConsole) handler(proc process, message Message, node string) proc.processes.tui.toConsoleCh <- message.Data } else { er := fmt.Errorf("error: no tui client started") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } default: fmt.Fprintf(os.Stdout, "%v", string(message.Data)) @@ -1334,7 +1334,7 @@ func (m methodREQTuiToConsole) handler(proc process, message Message, node strin proc.processes.tui.toConsoleCh <- message.Data } else { er := fmt.Errorf("error: no tui client started") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -1354,7 +1354,7 @@ func (m methodREQHttpGet) getKind() Event { // handler to do a Http Get. func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) { inf := fmt.Errorf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data) - proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) proc.processes.wg.Add(1) go func() { @@ -1363,7 +1363,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQHttpGet: got <1 number methodArgs") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -1380,7 +1380,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) cancel() return } @@ -1394,7 +1394,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ resp, err := client.Do(req) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } defer resp.Body.Close() @@ -1402,14 +1402,14 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ if resp.StatusCode != 200 { cancel() er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, bailing out: %v", resp.StatusCode, message) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } body, err := io.ReadAll(resp.Body) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } out := body @@ -1425,7 +1425,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) case out := <-outCh: cancel() @@ -1454,7 +1454,7 @@ func (m methodREQHttpGetScheduled) getKind() Event { // The second element of the MethodArgs slice holds the timer defined in seconds. func (m methodREQHttpGetScheduled) handler(proc process, message Message, node string) ([]byte, error) { inf := fmt.Errorf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data) - proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) proc.processes.wg.Add(1) go func() { @@ -1465,7 +1465,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s switch { case len(message.MethodArgs) < 3: er := fmt.Errorf("error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -1475,14 +1475,14 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s scheduleInterval, err := strconv.Atoi(message.MethodArgs[1]) if err != nil { er := fmt.Errorf("error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } schedulerTotalTime, err := strconv.Atoi(message.MethodArgs[2]) if err != nil { er := fmt.Errorf("error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -1516,7 +1516,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, error: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) cancel() return } @@ -1529,7 +1529,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s resp, err := client.Do(req) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, error: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } defer resp.Body.Close() @@ -1537,14 +1537,14 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s if resp.StatusCode != 200 { cancel() er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, error: %v", resp.StatusCode, message) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } body, err := io.ReadAll(resp.Body) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } out := body @@ -1575,7 +1575,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s // fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n") cancel() er := fmt.Errorf("error: methodREQHttpGet: schedule context timed out: %v", message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -1605,7 +1605,7 @@ func (m methodREQTailFile) getKind() Event { // as a new message. func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) { inf := fmt.Errorf("<--- TailFile REQUEST received from: %v, containing: %v", message.FromNode, message.Data) - proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) proc.processes.wg.Add(1) go func() { @@ -1614,7 +1614,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQTailFile: got <1 number methodArgs") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -1641,7 +1641,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( }}) if err != nil { er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } proc.processes.wg.Add(1) @@ -1667,7 +1667,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( // go routine. // close(t.Lines) er := fmt.Errorf("info: method timeout reached REQTailFile, canceling: %v", message.MethodArgs) - proc.processes.errorKernel.infoSend(proc, message, er) + proc.errorKernel.infoSend(proc, message, er) return case out := <-outCh: @@ -1699,7 +1699,7 @@ func (m methodREQCliCommandCont) getKind() Event { // back as it is generated, and not just when the command is finished. func (m methodREQCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) { inf := fmt.Errorf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data) - proc.processes.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) + proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) // Execute the CLI command in it's own go routine, so we are able // to return immediately with an ack reply that the message was @@ -1718,7 +1718,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return case len(message.MethodArgs) >= 0: @@ -1746,18 +1746,18 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str outReader, err := cmd.StdoutPipe() if err != nil { er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } ErrorReader, err := cmd.StderrPipe() if err != nil { er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } if err := cmd.Start(); err != nil { er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } go func() { @@ -1787,7 +1787,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str if err := cmd.Wait(); err != nil { er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceled: methodArgs: %v, %v", message.MethodArgs, err) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } }() @@ -1798,7 +1798,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str case <-ctx.Done(): cancel() er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceling: methodArgs: %v", message.MethodArgs) - proc.processes.errorKernel.infoSend(proc, message, er) + proc.errorKernel.infoSend(proc, message, er) return case out := <-outCh: // fmt.Printf(" * out: %v\n", string(out)) @@ -1870,7 +1870,7 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin switch { case len(message.MethodArgs) < 3: er := fmt.Errorf("error: methodREQRelayInitial: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return } @@ -1903,11 +1903,11 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin select { case <-ctx.Done(): er := fmt.Errorf("error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v", message.MethodArgs) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return case er := <-errCh: - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return case <-nothingCh: @@ -1928,7 +1928,7 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin sam, err := newSubjectAndMessage(message) if err != nil { er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -1964,7 +1964,7 @@ func (m methodREQRelay) handler(proc process, message Message, node string) ([]b sam, err := newSubjectAndMessage(message) if err != nil { er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.processes.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er) return }