mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-31 01:24:31 +00:00
errorKernel, send error for all requests
This commit is contained in:
parent
db60d69289
commit
024842b99c
2 changed files with 65 additions and 58 deletions
|
@ -78,7 +78,8 @@ func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error {
|
|||
// the operator....or other ?
|
||||
switch errEvent.errorType {
|
||||
|
||||
case errSend:
|
||||
case errTypeSendToCentralErrorLogger:
|
||||
fmt.Printf(" * case errTypeSend\n")
|
||||
// Just log the error, and don't use the errorAction channel
|
||||
// so the process who sent the error don't have to wait for
|
||||
// the error message to be sent before it can continue.
|
||||
|
@ -105,7 +106,10 @@ func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error {
|
|||
e.metrics.promErrorMessagesSentTotal.Inc()
|
||||
}()
|
||||
|
||||
default:
|
||||
case errTypeWithAction:
|
||||
// TODO: Look into how to implement error actions.
|
||||
|
||||
fmt.Printf(" * case errTypeWithAction\n")
|
||||
// Just print the error, and tell the process to continue. The
|
||||
// process who sent the error should block andwait for receiving
|
||||
// an errActionContinue message.
|
||||
|
@ -122,6 +126,9 @@ func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error {
|
|||
return
|
||||
}
|
||||
}()
|
||||
|
||||
default:
|
||||
fmt.Printf(" * case default\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -133,9 +140,10 @@ func (e *errorKernel) stop() {
|
|||
// sendError will just send an error to the errorCentral.
|
||||
func (e *errorKernel) errSend(proc process, msg Message, err error) {
|
||||
ev := errorEvent{
|
||||
//errorType: logOnly,
|
||||
process: proc,
|
||||
message: msg,
|
||||
err: err,
|
||||
errorType: errTypeSendToCentralErrorLogger,
|
||||
process: proc,
|
||||
message: msg,
|
||||
// We don't want to create any actions when just
|
||||
// sending errors.
|
||||
// errorActionCh: make(chan errorAction),
|
||||
|
@ -144,10 +152,9 @@ func (e *errorKernel) errSend(proc process, msg Message, err error) {
|
|||
e.errorCh <- ev
|
||||
}
|
||||
|
||||
// sendError will just send an error to the errorCentral.
|
||||
// errWithAction
|
||||
//
|
||||
// NB: This func is for now only an idea for how to implement
|
||||
// the usage of actions and might not make any sense at all later....
|
||||
// TODO: Look into how to implement error actions.
|
||||
func (e *errorKernel) errWithAction(proc process, msg Message, err error) chan errorAction {
|
||||
// Create the channel where to receive what action to do.
|
||||
errAction := make(chan errorAction)
|
||||
|
@ -189,7 +196,8 @@ type errorType int
|
|||
const (
|
||||
// errSend will just send the content of the error to the
|
||||
// central error logger.
|
||||
errSend errorType = iota
|
||||
errTypeSendToCentralErrorLogger errorType = iota
|
||||
errTypeWithAction errorType = iota
|
||||
)
|
||||
|
||||
type errorEvent struct {
|
||||
|
|
97
requests.go
97
requests.go
|
@ -332,7 +332,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
|||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
@ -437,7 +437,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQOpProcessStart: got <1 number methodArgs")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return
|
||||
}
|
||||
|
@ -447,7 +447,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
|||
tmpH := mt.getHandler(Method(method))
|
||||
if tmpH == nil {
|
||||
er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -458,7 +458,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
|||
|
||||
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||
er := fmt.Errorf(txt)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
|
||||
// TODO: What should this look like ?
|
||||
out = []byte(txt + "\n")
|
||||
|
@ -504,7 +504,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
|
||||
if v := len(message.MethodArgs); v != 3 {
|
||||
er := fmt.Errorf("error: methodREQCliCommand: got <4 number methodArgs, want: method,node,kind")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
@ -516,7 +516,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
tmpH := mt.getHandler(Method(method))
|
||||
if tmpH == nil {
|
||||
er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -542,7 +542,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
err := toStopProc.natsSubscription.Unsubscribe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
@ -551,7 +551,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
|
||||
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode)
|
||||
er := fmt.Errorf(txt)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
out = []byte(txt + "\n")
|
||||
|
@ -560,7 +560,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri
|
|||
} else {
|
||||
txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode)
|
||||
er := fmt.Errorf(txt)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
out = []byte(txt + "\n")
|
||||
|
@ -598,7 +598,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFileAppend: failed to create toFileAppend directory tree:%v, subject: %v, %v", folderTree, proc.subject, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
@ -610,7 +610,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFileAppend.handler: failed to open file: %v, %v", file, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -621,7 +621,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file : %v, %v", file, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
}
|
||||
|
@ -653,7 +653,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFile failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
return nil, er
|
||||
|
@ -667,7 +667,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToFile.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -678,7 +678,7 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: file: %v, %v", file, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
}
|
||||
|
@ -708,7 +708,7 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
|
|||
switch {
|
||||
case len(message.MethodArgs) < 3:
|
||||
er := fmt.Errorf("error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return
|
||||
}
|
||||
|
@ -734,11 +734,11 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
|
||||
return
|
||||
case er := <-errCh:
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
|
||||
return
|
||||
case out := <-outCh:
|
||||
|
@ -762,7 +762,7 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
|
|||
sam, err := newSubjectAndMessage(msg)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
@ -862,7 +862,7 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string)
|
|||
switch {
|
||||
case len(message.MethodArgs) < 3:
|
||||
er := fmt.Errorf("error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return
|
||||
}
|
||||
|
@ -917,12 +917,12 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string)
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
return
|
||||
|
||||
case err := <-errCh:
|
||||
er := fmt.Errorf("error: methodREQCopyFileTo: %v", err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
return
|
||||
|
||||
case out := <-outCh:
|
||||
|
@ -1062,7 +1062,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to create toFile directory tree: %v, %v", folderTree, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
return nil, er
|
||||
|
@ -1076,7 +1076,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1088,7 +1088,7 @@ func (m methodREQPing) handler(proc process, message Message, node string) ([]by
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPing.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
@ -1126,7 +1126,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
err := os.MkdirAll(folderTree, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to create toFile directory tree %v: %v", folderTree, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
return nil, er
|
||||
|
@ -1140,7 +1140,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1152,7 +1152,7 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
|
|||
f.Sync()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQPong.handler: failed to write to file: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
@ -1189,7 +1189,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return
|
||||
case len(message.MethodArgs) >= 0:
|
||||
|
@ -1238,8 +1238,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
err := cmd.Run()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String())
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
}
|
||||
select {
|
||||
case outCh <- out.Bytes():
|
||||
|
@ -1252,7 +1251,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
|
|||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQCliCommand: method timed out: %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
case out := <-outCh:
|
||||
cancel()
|
||||
|
||||
|
@ -1354,7 +1353,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQHttpGet: got <1 number methodArgs")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return
|
||||
}
|
||||
|
@ -1370,7 +1369,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
@ -1384,7 +1383,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
@ -1392,14 +1391,14 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
if resp.StatusCode != 200 {
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
@ -1416,7 +1415,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
|
|||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
case out := <-outCh:
|
||||
cancel()
|
||||
|
||||
|
@ -1454,7 +1453,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQTailFile: got <1 number methodArgs")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return
|
||||
}
|
||||
|
@ -1478,7 +1477,7 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
|
|||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQToTailFile: tailFile: %v", err)
|
||||
log.Printf("%v\n", er)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
}
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
|
@ -1550,7 +1549,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
er := fmt.Errorf("error: methodREQCliCommand: got <1 number methodArgs")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return
|
||||
case len(message.MethodArgs) >= 0:
|
||||
|
@ -1575,20 +1574,20 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
|
|||
outReader, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StdoutPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("error: %v\n", er)
|
||||
}
|
||||
|
||||
ErrorReader, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.StderrPipe failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
er := fmt.Errorf("error: methodREQCliCommandCont: cmd.Start failed : %v, methodArgs: %v", err, message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
|
||||
}
|
||||
|
@ -1700,7 +1699,7 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin
|
|||
switch {
|
||||
case len(message.MethodArgs) < 3:
|
||||
er := fmt.Errorf("error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
return
|
||||
}
|
||||
|
@ -1733,11 +1732,11 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
er := fmt.Errorf("error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v", message.MethodArgs)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
|
||||
return
|
||||
case er := <-errCh:
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
|
||||
return
|
||||
case <-nothingCh:
|
||||
|
@ -1758,7 +1757,7 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin
|
|||
sam, err := newSubjectAndMessage(message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
@ -1791,7 +1790,7 @@ func (m methodREQRelay) handler(proc process, message Message, node string) ([]b
|
|||
sam, err := newSubjectAndMessage(message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
proc.processes.errorKernel.errSend(proc, message, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue