From 807455119cad271055857d361fd29836fba7ead3 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 11 Jan 2023 08:38:15 +0100 Subject: [PATCH] Added x/slog for stderr logging --- central_auth_key_handling.go | 6 ++-- errorkernel.go | 31 ++++++++++++++--- message_readers.go | 44 ++++++++++++------------ process.go | 32 ++++++++--------- processes.go | 6 ++-- requests.go | 2 +- requests_acl.go | 52 ++++++++++++++-------------- requests_cli.go | 16 ++++----- requests_copy.go | 66 ++++++++++++++++++------------------ requests_file_handling.go | 16 ++++----- requests_http.go | 28 +++++++-------- requests_keys.go | 22 ++++++------ requests_operator.go | 16 ++++----- requests_std.go | 20 +++++------ ringbuffer.go | 6 ++-- server.go | 4 +-- 16 files changed, 195 insertions(+), 172 deletions(-) diff --git a/central_auth_key_handling.go b/central_auth_key_handling.go index 8d1cc11..4de0e7a 100644 --- a/central_auth_key_handling.go +++ b/central_auth_key_handling.go @@ -157,7 +157,7 @@ func (c *centralAuth) deletePublicKeys(proc process, msg Message, nodes []string err := c.pki.dbDeletePublicKeys(c.pki.bucketNamePublicKeys, nodes) if err != nil { - proc.errorKernel.errSend(proc, msg, err) + proc.errorKernel.errSend(proc, msg, err, logWarning) } er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode) @@ -287,7 +287,7 @@ func (c *centralAuth) updateHash(proc process, message Message) { b, err := cbor.Marshal(sortedNodesAndKeys) if err != nil { er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal slice, and will not update hash for public keys: %v", err) - c.pki.errorKernel.errSend(proc, message, er) + c.pki.errorKernel.errSend(proc, message, er, logWarning) log.Printf(" * DEBUG: %v\n", er) return @@ -301,7 +301,7 @@ func (c *centralAuth) updateHash(proc process, message Message) { c.pki.dbUpdateHash(hash[:]) if err != nil { er := fmt.Errorf("error: methodREQKeysAllow, failed to store the hash into the db: %v", err) - c.pki.errorKernel.errSend(proc, message, er) + c.pki.errorKernel.errSend(proc, message, er, logWarning) log.Printf(" * DEBUG: %v\n", er) return diff --git a/errorkernel.go b/errorkernel.go index 2afc188..69ee7d1 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -52,6 +52,7 @@ func newErrorKernel(ctx context.Context, m *metrics, configuration *Configuratio type logLevel string +const logError logLevel = "error" const logInfo logLevel = "info" const logWarning logLevel = "warning" const logDebug logLevel = "debug" @@ -80,6 +81,11 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error } switch { + case e.configuration.LogLevel == string(logError): + opts := slog.HandlerOptions{Level: slog.LevelError, + ReplaceAttr: replaceFunc} + slog.SetDefault(slog.New(opts.NewTextHandler(os.Stderr))) + case e.configuration.LogLevel == string(logInfo): opts := slog.HandlerOptions{Level: slog.LevelInfo, ReplaceAttr: replaceFunc} @@ -133,9 +139,23 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error // Put the message on the channel to the ringbuffer. ringBufferBulkInCh <- []subjectAndMessage{sam} - if errEvent.process.configuration.EnableDebug { - log.Printf("%v\n", er) + // if errEvent.process.configuration.EnableDebug { + // log.Printf("%v\n", er) + // } + + switch errEvent.logLevel { + case logError: + slog.Error("%v\n", fmt.Errorf("%v", er), "error") + case logInfo: + slog.Info("%v\n", er) + case logWarning: + slog.Warn("%v\n", er) + case logDebug: + slog.Debug("%v\n", er) + case logNone: + // Do nothing for type logNone errors. } + } // Check the type of the error to decide what to do. @@ -186,7 +206,7 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error } // We also want to log the error. - e.errSend(errEvent.process, errEvent.message, errEvent.err) + e.errSend(errEvent.process, errEvent.message, errEvent.err, logWarning) }() default: @@ -211,6 +231,8 @@ type errorEvent struct { process process // The message that where in progress when error occured message Message + // Level, the log level of the severity + logLevel logLevel } func (e errorEvent) Error() string { @@ -218,12 +240,13 @@ func (e errorEvent) Error() string { } // errSend will just send an error message to the errorCentral. -func (e *errorKernel) errSend(proc process, msg Message, err error) { +func (e *errorKernel) errSend(proc process, msg Message, err error, logLevel logLevel) { ev := errorEvent{ err: err, errorType: errTypeSendError, process: proc, message: msg, + logLevel: logLevel, // We don't want to create any actions when just // sending errors. // errorActionCh: make(chan errorAction), diff --git a/message_readers.go b/message_readers.go index a26f4fa..52c619c 100644 --- a/message_readers.go +++ b/message_readers.go @@ -34,7 +34,7 @@ func (s *server) readStartupFolder() { filePaths, err := s.getFilePaths(startupFolder) if err != nil { er := fmt.Errorf("error: readStartupFolder: unable to get filenames: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } @@ -64,7 +64,7 @@ func (s *server) readStartupFolder() { }(filePath) if err != nil { - s.errorKernel.errSend(s.processInitial, Message{}, err) + s.errorKernel.errSend(s.processInitial, Message{}, err, logWarning) continue } @@ -74,7 +74,7 @@ func (s *server) readStartupFolder() { sams, err := s.convertBytesToSAMs(readBytes) if err != nil { er := fmt.Errorf("error: startup folder: malformed json read: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) continue } @@ -84,12 +84,12 @@ func (s *server) readStartupFolder() { case sams[i].Message.FromNode == "": sams = append(sams[:i], sams[i+1:]...) er := fmt.Errorf(" error: missing value in fromNode field in startup message, discarding message") - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) case sams[i].Message.ToNode == "" && len(sams[i].Message.ToNodes) == 0: sams = append(sams[:i], sams[i+1:]...) er := fmt.Errorf(" error: missing value in both toNode and toNodes fields in startup message, discarding message") - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) } // NB: REMOVED CODE! @@ -151,7 +151,7 @@ func (s *server) readSocket() { conn, err := s.StewardSocket.Accept() if err != nil { er := fmt.Errorf("error: failed to accept conn on socket: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logError) } go func(conn net.Conn) { @@ -164,7 +164,7 @@ func (s *server) readSocket() { _, err = conn.Read(b) if err != nil && err != io.EOF { er := fmt.Errorf("error: failed to read data from socket: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } @@ -181,7 +181,7 @@ func (s *server) readSocket() { sams, err := s.convertBytesToSAMs(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on socket: %s\n %v", readBytes, err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } @@ -194,7 +194,7 @@ func (s *server) readSocket() { // Send an info message to the central about the message picked // for auditing. er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) } // Send the SAM struct to be picked up by the ring buffer. @@ -239,14 +239,14 @@ func (s *server) readFolder() { fh, err := os.Open(event.Name) if err != nil { er := fmt.Errorf("error: readFolder: failed to open readFile from readFolder: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } b, err := io.ReadAll(fh) if err != nil { er := fmt.Errorf("error: readFolder: failed to readall from readFolder: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) fh.Close() return } @@ -258,7 +258,7 @@ func (s *server) readFolder() { sams, err := s.convertBytesToSAMs(b) if err != nil { er := fmt.Errorf("error: readFolder: malformed json received: %s\n %v", b, err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } @@ -271,7 +271,7 @@ func (s *server) readFolder() { // Send an info message to the central about the message picked // for auditing. er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, sams[i].Message) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) } // Send the SAM struct to be picked up by the ring buffer. @@ -281,7 +281,7 @@ func (s *server) readFolder() { err = os.Remove(event.Name) if err != nil { er := fmt.Errorf("error: readFolder: failed to remove readFile from readFolder: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } @@ -293,7 +293,7 @@ func (s *server) readFolder() { return } er := fmt.Errorf("error: readFolder: file watcher error: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) } } }() @@ -322,7 +322,7 @@ func (s *server) readTCPListener() { conn, err := ln.Accept() if err != nil { er := fmt.Errorf("error: failed to accept conn on socket: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logError) continue } @@ -336,7 +336,7 @@ func (s *server) readTCPListener() { _, err = conn.Read(b) if err != nil && err != io.EOF { er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } @@ -353,7 +353,7 @@ func (s *server) readTCPListener() { sam, err := s.convertBytesToSAMs(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on tcp listener: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } @@ -380,7 +380,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) _, err := r.Body.Read(b) if err != nil && err != io.EOF { er := fmt.Errorf("error: failed to read data from tcp listener: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } @@ -397,7 +397,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) sam, err := s.convertBytesToSAMs(readBytes) if err != nil { er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err) - s.errorKernel.errSend(s.processInitial, Message{}, er) + s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning) return } @@ -465,7 +465,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) { sm, err := newSubjectAndMessage(m) if err != nil { er := fmt.Errorf("error: newSubjectAndMessage: %v", err) - s.errorKernel.errSend(s.processInitial, m, er) + s.errorKernel.errSend(s.processInitial, m, er, logWarning) continue } @@ -509,7 +509,7 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message { // the slice since it is not valid. default: er := fmt.Errorf("error: no toNode or toNodes where specified in the message, dropping message: %v", v) - s.errorKernel.errSend(s.processInitial, v, er) + s.errorKernel.errSend(s.processInitial, v, er, logWarning) continue } } diff --git a/process.go b/process.go index 67c4365..0a14fe1 100644 --- a/process.go +++ b/process.go @@ -218,7 +218,7 @@ func (p process) startPublisher() { err := p.procFunc(p.ctx, p.procFuncCh) if err != nil { er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er, logError) } }() } @@ -239,7 +239,7 @@ func (p process) startSubscriber() { err := p.procFunc(p.ctx, p.procFuncCh) if err != nil { er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er, logError) } }() } @@ -253,7 +253,7 @@ func (p process) startSubscriber() { err := p.natsSubscription.Unsubscribe() if err != nil { er := fmt.Errorf("error: spawnWorker: got <-ctx.Done, but unable to unsubscribe natsSubscription failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er, logError) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) } @@ -475,13 +475,13 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, zr, err := zstd.NewReader(nil) if err != nil { er := fmt.Errorf("error: zstd NewReader failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er, logWarning) return } msgData, err = zr.DecodeAll(msg.Data, nil) if err != nil { er := fmt.Errorf("error: zstd decoding failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er, logWarning) zr.Close() return } @@ -493,14 +493,14 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, gr, err := gzip.NewReader(r) if err != nil { er := fmt.Errorf("error: gzip NewReader failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er, logError) return } b, err := io.ReadAll(gr) if err != nil { er := fmt.Errorf("error: gzip ReadAll failed: %v", err) - p.errorKernel.errSend(p, Message{}, er) + p.errorKernel.errSend(p, Message{}, er, logWarning) return } @@ -521,7 +521,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, err := cbor.Unmarshal(msgData, &message) if err != nil { er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er, logError) return } default: // Deaults to gob if no match was found. @@ -531,7 +531,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, err := gobDec.Decode(&message) if err != nil { er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er, logError) return } } @@ -544,7 +544,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, err := gobDec.Decode(&message) if err != nil { er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er, logError) return } } @@ -578,7 +578,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, p.handler = mh.handler if !ok { er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event) - p.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er, logWarning) } } @@ -603,7 +603,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, p.handler = mh.handler if !ok { er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event) - p.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er, logWarning) } } @@ -635,7 +635,7 @@ func (p process) callHandler(message Message, thisNode string) []byte { case false: // ACL/Signature checking failed. er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing") - p.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er, logWarning) log.Printf("%v\n", er) } }() @@ -676,7 +676,7 @@ func executeHandler(p process, message Message, thisNode string) { _, err = p.handler(p, message, thisNode) if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - p.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er, logError) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) } }() @@ -700,7 +700,7 @@ func executeHandler(p process, message Message, thisNode string) { _, err := p.handler(p, message, thisNode) if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - p.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er, logError) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) } }() @@ -726,7 +726,7 @@ func executeHandler(p process, message Message, thisNode string) { _, err := p.handler(p, message, thisNode) if err != nil { er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - p.errorKernel.errSend(p, message, er) + p.errorKernel.errSend(p, message, er, logError) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) } }() diff --git a/processes.go b/processes.go index 60ba8a0..5983211 100644 --- a/processes.go +++ b/processes.go @@ -307,7 +307,7 @@ func (s startup) pubREQHello(p process) { sam, err := newSubjectAndMessage(m) if err != nil { // In theory the system should drop the message before it reaches here. - p.errorKernel.errSend(p, m, err) + p.errorKernel.errSend(p, m, err, logError) log.Printf("error: ProcessesStart: %v\n", err) } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -364,7 +364,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) { sam, err := newSubjectAndMessage(m) if err != nil { // In theory the system should drop the message before it reaches here. - p.errorKernel.errSend(p, m, err) + p.errorKernel.errSend(p, m, err, logError) log.Printf("error: ProcessesStart: %v\n", err) } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -421,7 +421,7 @@ func (s startup) pubREQAclRequestUpdate(p process) { sam, err := newSubjectAndMessage(m) if err != nil { // In theory the system should drop the message before it reaches here. - p.errorKernel.errSend(p, m, err) + p.errorKernel.errSend(p, m, err, logError) log.Printf("error: ProcessesStart: %v\n", err) } proc.toRingbufferCh <- []subjectAndMessage{sam} diff --git a/requests.go b/requests.go index 49d7cf0..776146a 100644 --- a/requests.go +++ b/requests.go @@ -480,7 +480,7 @@ func newReplyMessage(proc process, message Message, outData []byte) { if err != nil { // In theory the system should drop the message before it reaches here. er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logError) } proc.toRingbufferCh <- []subjectAndMessage{sam} diff --git a/requests_acl.go b/requests_acl.go index 2571f20..d0cb4ce 100644 --- a/requests_acl.go +++ b/requests_acl.go @@ -81,7 +81,7 @@ func (m methodREQAclRequestUpdate) handler(proc process, message Message, node s js, err := json.Marshal(hdh) if err != nil { er := fmt.Errorf("error: REQAclRequestUpdate : json marshal failed: %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } er = fmt.Errorf("----> subscriber methodREQAclRequestUpdate: SENDING ACL'S TO NODE=%v, serializedAndHash=%+v", message.FromNode, hdh) @@ -145,7 +145,7 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s err := json.Unmarshal(message.Data, &hdh) if err != nil { er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : json unmarshal failed: %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } mapOfFromNodeCommands := make(map[Node]map[command]struct{}) @@ -154,7 +154,7 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s err = cbor.Unmarshal(hdh.Data, &mapOfFromNodeCommands) if err != nil { er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : cbor unmarshal failed: %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logError) } } @@ -168,7 +168,7 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s err = proc.nodeAuth.nodeAcl.saveToFile() if err != nil { er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : save to file failed: %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logError) } // Prepare and queue for sending a new message with the output @@ -233,12 +233,12 @@ func (m methodREQAclAddCommand) handler(proc process, message Message, node stri select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclAddAccessList: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -303,12 +303,12 @@ func (m methodREQAclDeleteCommand) handler(proc process, message Message, node s select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclDeleteCommand: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -372,12 +372,12 @@ func (m methodREQAclDeleteSource) handler(proc process, message Message, node st select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclDeleteSource: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -441,12 +441,12 @@ func (m methodREQAclGroupNodesAddNode) handler(proc process, message Message, no select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclGroupNodesAddNode: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -510,12 +510,12 @@ func (m methodREQAclGroupNodesDeleteNode) handler(proc process, message Message, select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclGroupNodesDeleteNode: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -578,12 +578,12 @@ func (m methodREQAclGroupNodesDeleteGroup) handler(proc process, message Message select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclGroupNodesDeleteGroup: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -647,12 +647,12 @@ func (m methodREQAclGroupCommandsAddCommand) handler(proc process, message Messa select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclGroupCommandsAddCommand: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -716,12 +716,12 @@ func (m methodREQAclGroupCommandsDeleteCommand) handler(proc process, message Me select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclGroupCommandsDeleteCommand: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -784,12 +784,12 @@ func (m methodREQAclGroupCommandsDeleteGroup) handler(proc process, message Mess select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclGroupCommandsDeleteGroup: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -848,12 +848,12 @@ func (m methodREQAclExport) handler(proc process, message Message, node string) select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclExport: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output @@ -920,12 +920,12 @@ func (m methodREQAclImport) handler(proc process, message Message, node string) select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logError) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclImport: method timed out") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logInfo) case out := <-outCh: // Prepare and queue for sending a new message with the output diff --git a/requests_cli.go b/requests_cli.go index ea466c9..746df12 100644 --- a/requests_cli.go +++ b/requests_cli.go @@ -39,7 +39,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) return @@ -90,7 +90,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) err := cmd.Run() if err != nil { er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String()) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) } @@ -105,7 +105,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQCliCommand: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) case out := <-outCh: cancel() @@ -168,7 +168,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) return @@ -197,20 +197,20 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str outReader, err := cmd.StdoutPipe() if err != nil { er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) } ErrorReader, err := cmd.StderrPipe() if err != nil { er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) } if err := cmd.Start(); err != nil { er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) } @@ -241,7 +241,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str if err := cmd.Wait(); err != nil { er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceled: methodArgs: %v, %v", message.MethodArgs, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } }() diff --git a/requests_copy.go b/requests_copy.go index 65a5242..1514e9e 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -98,7 +98,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ sam, err := newSubjectAndMessage(message) if err != nil { er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -128,7 +128,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ if len(message.MethodArgs) < 3 { er := fmt.Errorf("error: methodREQCopySrc: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } @@ -138,7 +138,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ splitChunkSize, err = strconv.Atoi(message.MethodArgs[3]) if err != nil { er := fmt.Errorf("error: methodREQCopySrc: unble to convert splitChunkSize into int value: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } } @@ -148,7 +148,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ maxTotalCopyTime, err = strconv.Atoi(message.MethodArgs[4]) if err != nil { er := fmt.Errorf("error: methodREQCopySrc: unble to convert maxTotalCopyTime into int value: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } } @@ -164,7 +164,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) if err != nil { er := fmt.Errorf("error: methodREQCopySrc: unable to convert folderPermission into int value: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } } @@ -243,7 +243,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ cb, err := cbor.Marshal(cia) if err != nil { er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) cancel() } @@ -262,7 +262,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) cancel() } @@ -312,7 +312,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ err := cbor.Unmarshal(message.Data, &cia) if err != nil { er := fmt.Errorf("error: methodREQCopyDst: failed to cbor Unmarshal data: %v, message=%v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } @@ -460,7 +460,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel fh, err := os.Open(cia.SrcFilePath) if err != nil { er := fmt.Errorf("error: copySrcSubProcFunc: failed to open file: %v", err) - proc.errorKernel.errSend(proc, Message{}, er) + proc.errorKernel.errSend(proc, Message{}, er, logWarning) newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) return er } @@ -480,7 +480,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel err := cbor.Unmarshal(message.Data, &csa) if err != nil { er := fmt.Errorf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) return er } @@ -495,7 +495,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel n, err := fh.Read(b) if err != nil && err != io.EOF { er := fmt.Errorf("error: copySrcSubHandler: failed to read chunk from file: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) return er } @@ -524,7 +524,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel csaSerialized, err := cbor.Marshal(csa) if err != nil { er := fmt.Errorf("error: copySrcSubProcFunc: cbor marshal of csa failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) return er } @@ -546,7 +546,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copySrcProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) return er } @@ -568,7 +568,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel case copyResendLast: if resendRetries > message.Retries { er := fmt.Errorf("error: %v: failed to resend the chunk for the %v time, giving up", cia.DstMethod, resendRetries) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) // NB: Should we call cancel here, or wait for the timeout ? proc.ctxCancel() @@ -594,7 +594,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel csaSerialized, err := cbor.Marshal(csa) if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) return er } @@ -616,7 +616,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) return er } @@ -633,7 +633,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel default: er := fmt.Errorf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v", csa.CopyStatus) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForSubErrors, []byte(er.Error())) return er } @@ -657,7 +657,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc csaSerialized, err := cbor.Marshal(csa) if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -679,7 +679,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -691,7 +691,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc err = os.Mkdir(tmpFolder, 0770) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: create tmp folder for copying failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -699,7 +699,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc err = os.RemoveAll(tmpFolder) if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: remove temp dir failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } }() @@ -714,7 +714,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc err := cbor.Unmarshal(message.Data, &csa) if err != nil { er := fmt.Errorf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -749,7 +749,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc }() if err != nil { - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logWarning) return err } @@ -761,7 +761,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc csaSer, err := cbor.Marshal(csa) if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -781,7 +781,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -794,7 +794,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc csaSer, err := cbor.Marshal(csa) if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -814,7 +814,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -847,7 +847,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc mainfh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, cia.FileMode) if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: open final destination file failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } defer mainfh.Close() @@ -893,7 +893,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: creation of slice of chunk paths failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -925,14 +925,14 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: write to final destination file failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } // Remove the backup file. err = os.Remove(backupOriginalFileName) if err != nil && !os.IsNotExist(err) { er := fmt.Errorf("error: copyDstSubProcFunc: remove of backup of original file failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } er = fmt.Errorf("info: copy: successfully wrote all split chunk files into file=%v", filePath) @@ -947,7 +947,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc csaSerialized, err := cbor.Marshal(csa) if err != nil { er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } @@ -968,7 +968,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc sam, err := newSubjectAndMessage(msg) if err != nil { er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return er } diff --git a/requests_file_handling.go b/requests_file_handling.go index 18cd0ed..2fab731 100644 --- a/requests_file_handling.go +++ b/requests_file_handling.go @@ -28,7 +28,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin err := os.MkdirAll(folderTree, 0770) if err != nil { er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree) @@ -40,7 +40,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0660) if err != nil { er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return nil, err } defer f.Close() @@ -49,7 +49,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -79,7 +79,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([] err := os.MkdirAll(folderTree, 0770) if err != nil { er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return nil, er } @@ -93,7 +93,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([] f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return nil, err } @@ -103,7 +103,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([] f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -134,7 +134,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQTailFile: got <1 number methodArgs") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } @@ -161,7 +161,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( }}) if err != nil { er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } proc.processes.wg.Add(1) diff --git a/requests_http.go b/requests_http.go index 1963d95..439b673 100644 --- a/requests_http.go +++ b/requests_http.go @@ -32,7 +32,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQHttpGet: got <1 number methodArgs") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) return @@ -50,7 +50,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) cancel() return @@ -65,7 +65,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ resp, err := client.Do(req) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) return } @@ -74,7 +74,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ if resp.StatusCode != 200 { cancel() er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, bailing out: %v", resp.StatusCode, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) return } @@ -82,7 +82,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ body, err := io.ReadAll(resp.Body) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) } @@ -99,7 +99,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) newReplyMessage(proc, msgForErrors, []byte(er.Error())) case out := <-outCh: cancel() @@ -140,7 +140,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s switch { case len(message.MethodArgs) < 3: er := fmt.Errorf("error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } @@ -150,14 +150,14 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s scheduleInterval, err := strconv.Atoi(message.MethodArgs[1]) if err != nil { er := fmt.Errorf("error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } schedulerTotalTime, err := strconv.Atoi(message.MethodArgs[2]) if err != nil { er := fmt.Errorf("error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } @@ -191,7 +191,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, error: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) cancel() return } @@ -204,7 +204,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s resp, err := client.Do(req) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, error: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } defer resp.Body.Close() @@ -212,14 +212,14 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s if resp.StatusCode != 200 { cancel() er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, error: %v", resp.StatusCode, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } body, err := io.ReadAll(resp.Body) if err != nil { er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } out := body @@ -250,7 +250,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s // fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n") cancel() er := fmt.Errorf("error: methodREQHttpGet: schedule context timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return case out := <-outCh: // Prepare and queue for sending a new message with the output diff --git a/requests_keys.go b/requests_keys.go index d9eaf2c..71c42c4 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -122,7 +122,7 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node if err != nil { er := fmt.Errorf("error: methodREQKeysRequestUpdate, failed to marshal keys map: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } er = fmt.Errorf("----> methodREQKeysRequestUpdate: SENDING KEYS TO NODE=%v", message.FromNode) proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) @@ -183,7 +183,7 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node err := json.Unmarshal(message.Data, &keysAndHash) if err != nil { er := fmt.Errorf("error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } er := fmt.Errorf("<---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v", keysAndHash) @@ -201,7 +201,7 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node if err != nil { er := fmt.Errorf("error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } // We need to also persist the hash on the receiving nodes. We can then load @@ -210,7 +210,7 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node err = proc.nodeAuth.publicKeys.saveToFile() if err != nil { er := fmt.Errorf("error: REQKeysDeliverUpdate : save to file failed: %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } // Prepare and queue for sending a new message with the output @@ -304,7 +304,7 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string) err := pushKeys(proc, message, []Node{}) if err != nil { - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logWarning) return } @@ -363,7 +363,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { if err != nil { // In theory the system should drop the message before it reaches here. er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -377,7 +377,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { if err != nil { er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal keys map: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } proc.centralAuth.pki.nodesAcked.mu.Lock() @@ -410,7 +410,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { if err != nil { // In theory the system should drop the message before it reaches here. er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } proc.toRingbufferCh <- []subjectAndMessage{sam} @@ -480,7 +480,7 @@ func (m methodREQKeysDelete) handler(proc process, message Message, node string) err := pushKeys(proc, message, nodes) if err != nil { - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logWarning) return } @@ -496,12 +496,12 @@ func (m methodREQKeysDelete) handler(proc process, message Message, node string) select { case err := <-errCh: - proc.errorKernel.errSend(proc, message, err) + proc.errorKernel.errSend(proc, message, err, logWarning) case <-ctx.Done(): cancel() er := fmt.Errorf("error: methodREQAclGroupNodesDeleteNode: method timed out: %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) case out := <-outCh: diff --git a/requests_operator.go b/requests_operator.go index 7e679aa..d9f24cd 100644 --- a/requests_operator.go +++ b/requests_operator.go @@ -68,7 +68,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str switch { case len(message.MethodArgs) < 1: er := fmt.Errorf("error: methodREQOpProcessStart: got <1 number methodArgs") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } @@ -77,7 +77,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str tmpH := mt.getHandler(Method(method)) if tmpH == nil { er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } @@ -88,7 +88,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) er := fmt.Errorf(txt) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) out = []byte(txt + "\n") newReplyMessage(proc, message, out) @@ -133,7 +133,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri if v := len(message.MethodArgs); v != 3 { er := fmt.Errorf("error: methodREQOpProcessStop: got <4 number methodArgs, want: method,node,kind") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } methodString := message.MethodArgs[0] @@ -144,7 +144,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri tmpH := mt.getHandler(Method(method)) if tmpH == nil { er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return } @@ -170,7 +170,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri err := toStopProc.natsSubscription.Unsubscribe() if err != nil { er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } // Remove the prometheus label @@ -178,7 +178,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode) er := fmt.Errorf(txt) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) out = []byte(txt + "\n") newReplyMessage(proc, message, out) @@ -186,7 +186,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri } else { txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode) er := fmt.Errorf(txt) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) out = []byte(txt + "\n") newReplyMessage(proc, message, out) diff --git a/requests_std.go b/requests_std.go index 9a6e779..a8df556 100644 --- a/requests_std.go +++ b/requests_std.go @@ -51,7 +51,7 @@ func (m methodREQHello) handler(proc process, message Message, node string) ([]b f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } // -------------------------- @@ -106,7 +106,7 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) ( f.Sync() if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -136,7 +136,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by err := os.MkdirAll(folderTree, 0770) if err != nil { er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree: %v, %v", folderTree, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return nil, er } @@ -150,7 +150,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return nil, err } @@ -162,7 +162,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by f.Sync() if err != nil { er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } proc.processes.wg.Add(1) @@ -199,7 +199,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by err := os.MkdirAll(folderTree, 0770) if err != nil { er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v", folderTree, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return nil, er } @@ -213,7 +213,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) return nil, err } @@ -225,7 +225,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by f.Sync() if err != nil { er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) @@ -252,7 +252,7 @@ func (m methodREQToConsole) handler(proc process, message Message, node string) proc.processes.tui.toConsoleCh <- message.Data } else { er := fmt.Errorf("error: no tui client started") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } case len(message.MethodArgs) > 0 && message.MethodArgs[0] == "stderr": log.Printf("* DEBUG: MethodArgs: got stderr \n") @@ -285,7 +285,7 @@ func (m methodREQTuiToConsole) handler(proc process, message Message, node strin proc.processes.tui.toConsoleCh <- message.Data } else { er := fmt.Errorf("error: no tui client started") - proc.errorKernel.errSend(proc, message, er) + proc.errorKernel.errSend(proc, message, er, logWarning) } ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) diff --git a/ringbuffer.go b/ringbuffer.go index 26c01a1..3cc879b 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -203,14 +203,14 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage js, err := json.Marshal(samV) if err != nil { er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err) - r.errorKernel.errSend(r.processInitial, Message{}, er) + r.errorKernel.errSend(r.processInitial, Message{}, er, logError) } // Store the incomming message in key/value store err = r.dbUpdate(r.db, r.samValueBucket, strconv.Itoa(dbID), js) if err != nil { er := fmt.Errorf("error: dbUpdate samValue failed: %v", err) - r.errorKernel.errSend(r.processInitial, Message{}, er) + r.errorKernel.errSend(r.processInitial, Message{}, er, logError) } } @@ -328,7 +328,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB js, err := json.Marshal(msgForPermStore) if err != nil { er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err) - r.errorKernel.errSend(r.processInitial, Message{}, er) + r.errorKernel.errSend(r.processInitial, Message{}, er, logError) } r.permStore <- time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n" diff --git a/server.go b/server.go index b5d4598..68f5749 100644 --- a/server.go +++ b/server.go @@ -352,7 +352,7 @@ func (s *server) directSAMSChRead() { mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event) - p.errorKernel.errSend(p, sams[i].Message, er) + p.errorKernel.errSend(p, sams[i].Message, er, logError) continue } @@ -453,7 +453,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // Check if the format of the message is correct. if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) - s.errorKernel.errSend(s.processInitial, sam.Message, er) + s.errorKernel.errSend(s.processInitial, sam.Message, er, logError) return }