mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
Added x/slog for stderr logging
This commit is contained in:
parent
215a4c387a
commit
807455119c
16 changed files with 195 additions and 172 deletions
|
@ -157,7 +157,7 @@ func (c *centralAuth) deletePublicKeys(proc process, msg Message, nodes []string
|
|||
|
||||
err := c.pki.dbDeletePublicKeys(c.pki.bucketNamePublicKeys, nodes)
|
||||
if err != nil {
|
||||
proc.errorKernel.errSend(proc, msg, err)
|
||||
proc.errorKernel.errSend(proc, msg, err, logWarning)
|
||||
}
|
||||
|
||||
er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode)
|
||||
|
@ -287,7 +287,7 @@ func (c *centralAuth) updateHash(proc process, message Message) {
|
|||
b, err := cbor.Marshal(sortedNodesAndKeys)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal slice, and will not update hash for public keys: %v", err)
|
||||
c.pki.errorKernel.errSend(proc, message, er)
|
||||
c.pki.errorKernel.errSend(proc, message, er, logWarning)
|
||||
log.Printf(" * DEBUG: %v\n", er)
|
||||
|
||||
return
|
||||
|
@ -301,7 +301,7 @@ func (c *centralAuth) updateHash(proc process, message Message) {
|
|||
c.pki.dbUpdateHash(hash[:])
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQKeysAllow, failed to store the hash into the db: %v", err)
|
||||
c.pki.errorKernel.errSend(proc, message, er)
|
||||
c.pki.errorKernel.errSend(proc, message, er, logWarning)
|
||||
log.Printf(" * DEBUG: %v\n", er)
|
||||
|
||||
return
|
||||
|
|
|
@ -52,6 +52,7 @@ func newErrorKernel(ctx context.Context, m *metrics, configuration *Configuratio
|
|||
|
||||
type logLevel string
|
||||
|
||||
const logError logLevel = "error"
|
||||
const logInfo logLevel = "info"
|
||||
const logWarning logLevel = "warning"
|
||||
const logDebug logLevel = "debug"
|
||||
|
@ -80,6 +81,11 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error
|
|||
}
|
||||
|
||||
switch {
|
||||
case e.configuration.LogLevel == string(logError):
|
||||
opts := slog.HandlerOptions{Level: slog.LevelError,
|
||||
ReplaceAttr: replaceFunc}
|
||||
slog.SetDefault(slog.New(opts.NewTextHandler(os.Stderr)))
|
||||
|
||||
case e.configuration.LogLevel == string(logInfo):
|
||||
opts := slog.HandlerOptions{Level: slog.LevelInfo,
|
||||
ReplaceAttr: replaceFunc}
|
||||
|
@ -133,9 +139,23 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error
|
|||
// Put the message on the channel to the ringbuffer.
|
||||
ringBufferBulkInCh <- []subjectAndMessage{sam}
|
||||
|
||||
if errEvent.process.configuration.EnableDebug {
|
||||
log.Printf("%v\n", er)
|
||||
// if errEvent.process.configuration.EnableDebug {
|
||||
// log.Printf("%v\n", er)
|
||||
// }
|
||||
|
||||
switch errEvent.logLevel {
|
||||
case logError:
|
||||
slog.Error("%v\n", fmt.Errorf("%v", er), "error")
|
||||
case logInfo:
|
||||
slog.Info("%v\n", er)
|
||||
case logWarning:
|
||||
slog.Warn("%v\n", er)
|
||||
case logDebug:
|
||||
slog.Debug("%v\n", er)
|
||||
case logNone:
|
||||
// Do nothing for type logNone errors.
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Check the type of the error to decide what to do.
|
||||
|
@ -186,7 +206,7 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error
|
|||
}
|
||||
|
||||
// We also want to log the error.
|
||||
e.errSend(errEvent.process, errEvent.message, errEvent.err)
|
||||
e.errSend(errEvent.process, errEvent.message, errEvent.err, logWarning)
|
||||
}()
|
||||
|
||||
default:
|
||||
|
@ -211,6 +231,8 @@ type errorEvent struct {
|
|||
process process
|
||||
// The message that where in progress when error occured
|
||||
message Message
|
||||
// Level, the log level of the severity
|
||||
logLevel logLevel
|
||||
}
|
||||
|
||||
func (e errorEvent) Error() string {
|
||||
|
@ -218,12 +240,13 @@ func (e errorEvent) Error() string {
|
|||
}
|
||||
|
||||
// errSend will just send an error message to the errorCentral.
|
||||
func (e *errorKernel) errSend(proc process, msg Message, err error) {
|
||||
func (e *errorKernel) errSend(proc process, msg Message, err error, logLevel logLevel) {
|
||||
ev := errorEvent{
|
||||
err: err,
|
||||
errorType: errTypeSendError,
|
||||
process: proc,
|
||||
message: msg,
|
||||
logLevel: logLevel,
|
||||
// We don't want to create any actions when just
|
||||
// sending errors.
|
||||
// errorActionCh: make(chan errorAction),
|
||||
|
|
|
@ -34,7 +34,7 @@ func (s *server) readStartupFolder() {
|
|||
filePaths, err := s.getFilePaths(startupFolder)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: readStartupFolder: unable to get filenames: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ func (s *server) readStartupFolder() {
|
|||
}(filePath)
|
||||
|
||||
if err != nil {
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, err, logWarning)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -74,7 +74,7 @@ func (s *server) readStartupFolder() {
|
|||
sams, err := s.convertBytesToSAMs(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: startup folder: malformed json read: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -84,12 +84,12 @@ func (s *server) readStartupFolder() {
|
|||
case sams[i].Message.FromNode == "":
|
||||
sams = append(sams[:i], sams[i+1:]...)
|
||||
er := fmt.Errorf(" error: missing value in fromNode field in startup message, discarding message")
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
|
||||
case sams[i].Message.ToNode == "" && len(sams[i].Message.ToNodes) == 0:
|
||||
sams = append(sams[:i], sams[i+1:]...)
|
||||
er := fmt.Errorf(" error: missing value in both toNode and toNodes fields in startup message, discarding message")
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
}
|
||||
|
||||
// NB: REMOVED CODE!
|
||||
|
@ -151,7 +151,7 @@ func (s *server) readSocket() {
|
|||
conn, err := s.StewardSocket.Accept()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
|
||||
}
|
||||
|
||||
go func(conn net.Conn) {
|
||||
|
@ -164,7 +164,7 @@ func (s *server) readSocket() {
|
|||
_, err = conn.Read(b)
|
||||
if err != nil && err != io.EOF {
|
||||
er := fmt.Errorf("error: failed to read data from socket: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -181,7 +181,7 @@ func (s *server) readSocket() {
|
|||
sams, err := s.convertBytesToSAMs(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json received on socket: %s\n %v", readBytes, err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -194,7 +194,7 @@ func (s *server) readSocket() {
|
|||
// Send an info message to the central about the message picked
|
||||
// for auditing.
|
||||
er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
|
||||
}
|
||||
|
||||
// Send the SAM struct to be picked up by the ring buffer.
|
||||
|
@ -239,14 +239,14 @@ func (s *server) readFolder() {
|
|||
fh, err := os.Open(event.Name)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: readFolder: failed to open readFile from readFolder: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(fh)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: readFolder: failed to readall from readFolder: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
fh.Close()
|
||||
return
|
||||
}
|
||||
|
@ -258,7 +258,7 @@ func (s *server) readFolder() {
|
|||
sams, err := s.convertBytesToSAMs(b)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: readFolder: malformed json received: %s\n %v", b, err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -271,7 +271,7 @@ func (s *server) readFolder() {
|
|||
// Send an info message to the central about the message picked
|
||||
// for auditing.
|
||||
er := fmt.Errorf("info: readFolder: message read from readFolder on %v: %v", s.nodeName, sams[i].Message)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
}
|
||||
|
||||
// Send the SAM struct to be picked up by the ring buffer.
|
||||
|
@ -281,7 +281,7 @@ func (s *server) readFolder() {
|
|||
err = os.Remove(event.Name)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: readFolder: failed to remove readFile from readFolder: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -293,7 +293,7 @@ func (s *server) readFolder() {
|
|||
return
|
||||
}
|
||||
er := fmt.Errorf("error: readFolder: file watcher error: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -322,7 +322,7 @@ func (s *server) readTCPListener() {
|
|||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to accept conn on socket: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -336,7 +336,7 @@ func (s *server) readTCPListener() {
|
|||
_, err = conn.Read(b)
|
||||
if err != nil && err != io.EOF {
|
||||
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -353,7 +353,7 @@ func (s *server) readTCPListener() {
|
|||
sam, err := s.convertBytesToSAMs(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json received on tcp listener: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -380,7 +380,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
|
|||
_, err := r.Body.Read(b)
|
||||
if err != nil && err != io.EOF {
|
||||
er := fmt.Errorf("error: failed to read data from tcp listener: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -397,7 +397,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
|
|||
sam, err := s.convertBytesToSAMs(readBytes)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json received on HTTPListener: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -465,7 +465,7 @@ func (s *server) convertBytesToSAMs(b []byte) ([]subjectAndMessage, error) {
|
|||
sm, err := newSubjectAndMessage(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage: %v", err)
|
||||
s.errorKernel.errSend(s.processInitial, m, er)
|
||||
s.errorKernel.errSend(s.processInitial, m, er, logWarning)
|
||||
|
||||
continue
|
||||
}
|
||||
|
@ -509,7 +509,7 @@ func (s *server) checkMessageToNodes(MsgSlice []Message) []Message {
|
|||
// the slice since it is not valid.
|
||||
default:
|
||||
er := fmt.Errorf("error: no toNode or toNodes where specified in the message, dropping message: %v", v)
|
||||
s.errorKernel.errSend(s.processInitial, v, er)
|
||||
s.errorKernel.errSend(s.processInitial, v, er, logWarning)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
32
process.go
32
process.go
|
@ -218,7 +218,7 @@ func (p process) startPublisher() {
|
|||
err := p.procFunc(p.ctx, p.procFuncCh)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er)
|
||||
p.errorKernel.errSend(p, Message{}, er, logError)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -239,7 +239,7 @@ func (p process) startSubscriber() {
|
|||
err := p.procFunc(p.ctx, p.procFuncCh)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: spawnWorker: start procFunc failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er)
|
||||
p.errorKernel.errSend(p, Message{}, er, logError)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -253,7 +253,7 @@ func (p process) startSubscriber() {
|
|||
err := p.natsSubscription.Unsubscribe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: spawnWorker: got <-ctx.Done, but unable to unsubscribe natsSubscription failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er)
|
||||
p.errorKernel.errSend(p, Message{}, er, logError)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
}
|
||||
|
||||
|
@ -475,13 +475,13 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
zr, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd NewReader failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
msgData, err = zr.DecodeAll(msg.Data, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd decoding failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
zr.Close()
|
||||
return
|
||||
}
|
||||
|
@ -493,14 +493,14 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
gr, err := gzip.NewReader(r)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gzip NewReader failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er)
|
||||
p.errorKernel.errSend(p, Message{}, er, logError)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(gr)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gzip ReadAll failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -521,7 +521,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
err := cbor.Unmarshal(msgData, &message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
default: // Deaults to gob if no match was found.
|
||||
|
@ -531,7 +531,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
err := gobDec.Decode(&message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -544,7 +544,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
err := gobDec.Decode(&message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -578,7 +578,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
p.handler = mh.handler
|
||||
if !ok {
|
||||
er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event)
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.errSend(p, message, er, logWarning)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -603,7 +603,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
p.handler = mh.handler
|
||||
if !ok {
|
||||
er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.Event)
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.errSend(p, message, er, logWarning)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -635,7 +635,7 @@ func (p process) callHandler(message Message, thisNode string) []byte {
|
|||
case false:
|
||||
// ACL/Signature checking failed.
|
||||
er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing")
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.errSend(p, message, er, logWarning)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
}()
|
||||
|
@ -676,7 +676,7 @@ func executeHandler(p process, message Message, thisNode string) {
|
|||
_, err = p.handler(p, message, thisNode)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
}
|
||||
}()
|
||||
|
@ -700,7 +700,7 @@ func executeHandler(p process, message Message, thisNode string) {
|
|||
_, err := p.handler(p, message, thisNode)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
}
|
||||
}()
|
||||
|
@ -726,7 +726,7 @@ func executeHandler(p process, message Message, thisNode string) {
|
|||
_, err := p.handler(p, message, thisNode)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -307,7 +307,7 @@ func (s startup) pubREQHello(p process) {
|
|||
sam, err := newSubjectAndMessage(m)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
p.errorKernel.errSend(p, m, err)
|
||||
p.errorKernel.errSend(p, m, err, logError)
|
||||
log.Printf("error: ProcessesStart: %v\n", err)
|
||||
}
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
@ -364,7 +364,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) {
|
|||
sam, err := newSubjectAndMessage(m)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
p.errorKernel.errSend(p, m, err)
|
||||
p.errorKernel.errSend(p, m, err, logError)
|
||||
log.Printf("error: ProcessesStart: %v\n", err)
|
||||
}
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
@ -421,7 +421,7 @@ func (s startup) pubREQAclRequestUpdate(p process) {
|
|||
sam, err := newSubjectAndMessage(m)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
p.errorKernel.errSend(p, m, err)
|
||||
p.errorKernel.errSend(p, m, err, logError)
|
||||
log.Printf("error: ProcessesStart: %v\n", err)
|
||||
}
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
|
|
@ -480,7 +480,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
|||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logError)
|
||||
}
|
||||
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
|
|
@ -81,7 +81,7 @@ func (m methodREQAclRequestUpdate) handler(proc process, message Message, node s
|
|||
js, err := json.Marshal(hdh)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: REQAclRequestUpdate : json marshal failed: %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
er = fmt.Errorf("----> subscriber methodREQAclRequestUpdate: SENDING ACL'S TO NODE=%v, serializedAndHash=%+v", message.FromNode, hdh)
|
||||
|
@ -145,7 +145,7 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s
|
|||
err := json.Unmarshal(message.Data, &hdh)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : json unmarshal failed: %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
mapOfFromNodeCommands := make(map[Node]map[command]struct{})
|
||||
|
@ -154,7 +154,7 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s
|
|||
err = cbor.Unmarshal(hdh.Data, &mapOfFromNodeCommands)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : cbor unmarshal failed: %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logError)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -168,7 +168,7 @@ func (m methodREQAclDeliverUpdate) handler(proc process, message Message, node s
|
|||
err = proc.nodeAuth.nodeAcl.saveToFile()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriber REQAclDeliverUpdate : save to file failed: %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logError)
|
||||
}
|
||||
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -233,12 +233,12 @@ func (m methodREQAclAddCommand) handler(proc process, message Message, node stri
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclAddAccessList: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -303,12 +303,12 @@ func (m methodREQAclDeleteCommand) handler(proc process, message Message, node s
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclDeleteCommand: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -372,12 +372,12 @@ func (m methodREQAclDeleteSource) handler(proc process, message Message, node st
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclDeleteSource: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -441,12 +441,12 @@ func (m methodREQAclGroupNodesAddNode) handler(proc process, message Message, no
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclGroupNodesAddNode: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -510,12 +510,12 @@ func (m methodREQAclGroupNodesDeleteNode) handler(proc process, message Message,
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclGroupNodesDeleteNode: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -578,12 +578,12 @@ func (m methodREQAclGroupNodesDeleteGroup) handler(proc process, message Message
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclGroupNodesDeleteGroup: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -647,12 +647,12 @@ func (m methodREQAclGroupCommandsAddCommand) handler(proc process, message Messa
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclGroupCommandsAddCommand: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -716,12 +716,12 @@ func (m methodREQAclGroupCommandsDeleteCommand) handler(proc process, message Me
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclGroupCommandsDeleteCommand: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -784,12 +784,12 @@ func (m methodREQAclGroupCommandsDeleteGroup) handler(proc process, message Mess
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclGroupCommandsDeleteGroup: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -848,12 +848,12 @@ func (m methodREQAclExport) handler(proc process, message Message, node string)
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclExport: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -920,12 +920,12 @@ func (m methodREQAclImport) handler(proc process, message Message, node string)
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logError)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclImport: method timed out")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logInfo)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
|
|
@ -39,7 +39,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
|
||||
return
|
||||
|
@ -90,7 +90,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
err := cmd.Run()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String())
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQCliCommand: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
case out := <-outCh:
|
||||
cancel()
|
||||
|
@ -168,7 +168,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
|
||||
return
|
||||
|
@ -197,20 +197,20 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
outReader, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
}
|
||||
|
||||
ErrorReader, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
}
|
||||
|
||||
|
@ -241,7 +241,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
er := fmt.Errorf("info: methodREQCliCommandCont: method timeout reached, canceled: methodArgs: %v, %v", message.MethodArgs, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
}()
|
||||
|
|
|
@ -98,7 +98,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
sam, err := newSubjectAndMessage(message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
@ -128,7 +128,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
|
||||
if len(message.MethodArgs) < 3 {
|
||||
er := fmt.Errorf("error: methodREQCopySrc: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -138,7 +138,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
splitChunkSize, err = strconv.Atoi(message.MethodArgs[3])
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCopySrc: unble to convert splitChunkSize into int value: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,7 +148,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
maxTotalCopyTime, err = strconv.Atoi(message.MethodArgs[4])
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCopySrc: unble to convert maxTotalCopyTime into int value: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -164,7 +164,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCopySrc: unable to convert folderPermission into int value: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -243,7 +243,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
cb, err := cbor.Marshal(cia)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
cancel()
|
||||
}
|
||||
|
||||
|
@ -262,7 +262,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
|||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage failed: %v, message=%v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
cancel()
|
||||
}
|
||||
|
||||
|
@ -312,7 +312,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
|
|||
err := cbor.Unmarshal(message.Data, &cia)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCopyDst: failed to cbor Unmarshal data: %v, message=%v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -460,7 +460,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
fh, err := os.Open(cia.SrcFilePath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copySrcSubProcFunc: failed to open file: %v", err)
|
||||
proc.errorKernel.errSend(proc, Message{}, er)
|
||||
proc.errorKernel.errSend(proc, Message{}, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
@ -480,7 +480,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
err := cbor.Unmarshal(message.Data, &csa)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
@ -495,7 +495,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
n, err := fh.Read(b)
|
||||
if err != nil && err != io.EOF {
|
||||
er := fmt.Errorf("error: copySrcSubHandler: failed to read chunk from file: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
@ -524,7 +524,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
csaSerialized, err := cbor.Marshal(csa)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copySrcSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
@ -546,7 +546,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copySrcProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
@ -568,7 +568,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
case copyResendLast:
|
||||
if resendRetries > message.Retries {
|
||||
er := fmt.Errorf("error: %v: failed to resend the chunk for the %v time, giving up", cia.DstMethod, resendRetries)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
// NB: Should we call cancel here, or wait for the timeout ?
|
||||
proc.ctxCancel()
|
||||
|
@ -594,7 +594,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
csaSerialized, err := cbor.Marshal(csa)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
@ -616,7 +616,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
@ -633,7 +633,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
|||
|
||||
default:
|
||||
er := fmt.Errorf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v", csa.CopyStatus)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
|
||||
return er
|
||||
}
|
||||
|
@ -657,7 +657,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
csaSerialized, err := cbor.Marshal(csa)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -679,7 +679,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -691,7 +691,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
err = os.Mkdir(tmpFolder, 0770)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: create tmp folder for copying failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -699,7 +699,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
err = os.RemoveAll(tmpFolder)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: remove temp dir failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -714,7 +714,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
err := cbor.Unmarshal(message.Data, &csa)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -749,7 +749,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
}()
|
||||
|
||||
if err != nil {
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logWarning)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -761,7 +761,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
csaSer, err := cbor.Marshal(csa)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -781,7 +781,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -794,7 +794,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
csaSer, err := cbor.Marshal(csa)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -814,7 +814,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -847,7 +847,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
mainfh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, cia.FileMode)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: open final destination file failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
defer mainfh.Close()
|
||||
|
@ -893,7 +893,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: creation of slice of chunk paths failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -925,14 +925,14 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: write to final destination file failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
// Remove the backup file.
|
||||
err = os.Remove(backupOriginalFileName)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: remove of backup of original file failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
er = fmt.Errorf("info: copy: successfully wrote all split chunk files into file=%v", filePath)
|
||||
|
@ -947,7 +947,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
csaSerialized, err := cbor.Marshal(csa)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
@ -968,7 +968,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
|||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return er
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
err := os.MkdirAll(folderTree, 0770)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
er := fmt.Errorf("info: Creating subscribers data folder at %v", folderTree)
|
||||
|
@ -40,7 +40,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0660)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
@ -49,7 +49,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
|
@ -79,7 +79,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
err := os.MkdirAll(folderTree, 0770)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
return nil, er
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
@ -103,7 +103,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
|
@ -134,7 +134,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQTailFile: got <1 number methodArgs")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -161,7 +161,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
|||
}})
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
|
|
|
@ -32,7 +32,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQHttpGet: got <1 number methodArgs")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
|
||||
return
|
||||
|
@ -50,7 +50,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
cancel()
|
||||
return
|
||||
|
@ -65,7 +65,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
return
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
if resp.StatusCode != 200 {
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, bailing out: %v", resp.StatusCode, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
return
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
}
|
||||
|
||||
|
@ -99,7 +99,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
|
||||
case out := <-outCh:
|
||||
cancel()
|
||||
|
@ -140,7 +140,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
|
|||
switch {
|
||||
case len(message.MethodArgs) < 3:
|
||||
er := fmt.Errorf("error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -150,14 +150,14 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
|
|||
scheduleInterval, err := strconv.Atoi(message.MethodArgs[1])
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
schedulerTotalTime, err := strconv.Atoi(message.MethodArgs[2])
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -191,7 +191,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
|
|||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, error: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
|
|||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, error: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
@ -212,14 +212,14 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
|
|||
if resp.StatusCode != 200 {
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, error: %v", resp.StatusCode, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
out := body
|
||||
|
@ -250,7 +250,7 @@ func (m methodREQHttpGetScheduled) handler(proc process, message Message, node s
|
|||
// fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n")
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQHttpGet: schedule context timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
|
|
@ -122,7 +122,7 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node
|
|||
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQKeysRequestUpdate, failed to marshal keys map: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
er = fmt.Errorf("----> methodREQKeysRequestUpdate: SENDING KEYS TO NODE=%v", message.FromNode)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
|
||||
|
@ -183,7 +183,7 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node
|
|||
err := json.Unmarshal(message.Data, &keysAndHash)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
er := fmt.Errorf("<---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v", keysAndHash)
|
||||
|
@ -201,7 +201,7 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node
|
|||
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
// We need to also persist the hash on the receiving nodes. We can then load
|
||||
|
@ -210,7 +210,7 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node
|
|||
err = proc.nodeAuth.publicKeys.saveToFile()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: REQKeysDeliverUpdate : save to file failed: %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
// Prepare and queue for sending a new message with the output
|
||||
|
@ -304,7 +304,7 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string)
|
|||
err := pushKeys(proc, message, []Node{})
|
||||
|
||||
if err != nil {
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -363,7 +363,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
|||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
@ -377,7 +377,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
|||
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal keys map: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||
|
@ -410,7 +410,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
|||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
@ -480,7 +480,7 @@ func (m methodREQKeysDelete) handler(proc process, message Message, node string)
|
|||
err := pushKeys(proc, message, nodes)
|
||||
|
||||
if err != nil {
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -496,12 +496,12 @@ func (m methodREQKeysDelete) handler(proc process, message Message, node string)
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
proc.errorKernel.errSend(proc, message, err, logWarning)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclGroupNodesDeleteNode: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
case out := <-outCh:
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQOpProcessStart: got <1 number methodArgs")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
|||
tmpH := mt.getHandler(Method(method))
|
||||
if tmpH == nil {
|
||||
er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
|||
|
||||
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||
er := fmt.Errorf(txt)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
out = []byte(txt + "\n")
|
||||
newReplyMessage(proc, message, out)
|
||||
|
@ -133,7 +133,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
|
||||
if v := len(message.MethodArgs); v != 3 {
|
||||
er := fmt.Errorf("error: methodREQOpProcessStop: got <4 number methodArgs, want: method,node,kind")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
methodString := message.MethodArgs[0]
|
||||
|
@ -144,7 +144,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
tmpH := mt.getHandler(Method(method))
|
||||
if tmpH == nil {
|
||||
er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -170,7 +170,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
err := toStopProc.natsSubscription.Unsubscribe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
// Remove the prometheus label
|
||||
|
@ -178,7 +178,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
|
||||
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode)
|
||||
er := fmt.Errorf(txt)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
out = []byte(txt + "\n")
|
||||
newReplyMessage(proc, message, out)
|
||||
|
@ -186,7 +186,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
} else {
|
||||
txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode)
|
||||
er := fmt.Errorf(txt)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
out = []byte(txt + "\n")
|
||||
newReplyMessage(proc, message, out)
|
||||
|
|
|
@ -51,7 +51,7 @@ func (m methodREQHello) handler(proc process, message Message, node string) ([]b
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
// --------------------------
|
||||
|
@ -106,7 +106,7 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) (
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
|
@ -136,7 +136,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
err := os.MkdirAll(folderTree, 0770)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree: %v, %v", folderTree, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
return nil, er
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
@ -162,7 +162,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
|
@ -199,7 +199,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
err := os.MkdirAll(folderTree, 0770)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v", folderTree, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
return nil, er
|
||||
}
|
||||
|
@ -213,7 +213,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
@ -225,7 +225,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
|
@ -252,7 +252,7 @@ func (m methodREQToConsole) handler(proc process, message Message, node string)
|
|||
proc.processes.tui.toConsoleCh <- message.Data
|
||||
} else {
|
||||
er := fmt.Errorf("error: no tui client started")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
case len(message.MethodArgs) > 0 && message.MethodArgs[0] == "stderr":
|
||||
log.Printf("* DEBUG: MethodArgs: got stderr \n")
|
||||
|
@ -285,7 +285,7 @@ func (m methodREQTuiToConsole) handler(proc process, message Message, node strin
|
|||
proc.processes.tui.toConsoleCh <- message.Data
|
||||
} else {
|
||||
er := fmt.Errorf("error: no tui client started")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||
}
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
|
|
|
@ -203,14 +203,14 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
|
|||
js, err := json.Marshal(samV)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
|
||||
r.errorKernel.errSend(r.processInitial, Message{}, er)
|
||||
r.errorKernel.errSend(r.processInitial, Message{}, er, logError)
|
||||
}
|
||||
|
||||
// Store the incomming message in key/value store
|
||||
err = r.dbUpdate(r.db, r.samValueBucket, strconv.Itoa(dbID), js)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
|
||||
r.errorKernel.errSend(r.processInitial, Message{}, er)
|
||||
r.errorKernel.errSend(r.processInitial, Message{}, er, logError)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -328,7 +328,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
|
|||
js, err := json.Marshal(msgForPermStore)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
|
||||
r.errorKernel.errSend(r.processInitial, Message{}, er)
|
||||
r.errorKernel.errSend(r.processInitial, Message{}, er, logError)
|
||||
}
|
||||
r.permStore <- time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n"
|
||||
|
||||
|
|
|
@ -352,7 +352,7 @@ func (s *server) directSAMSChRead() {
|
|||
mh, ok := p.methodsAvailable.CheckIfExists(sams[i].Message.Method)
|
||||
if !ok {
|
||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
|
||||
p.errorKernel.errSend(p, sams[i].Message, er)
|
||||
p.errorKernel.errSend(p, sams[i].Message, er, logError)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -453,7 +453,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
// Check if the format of the message is correct.
|
||||
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
||||
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
||||
s.errorKernel.errSend(s.processInitial, sam.Message, er)
|
||||
s.errorKernel.errSend(s.processInitial, sam.Message, er, logError)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue