diff --git a/processes.go b/processes.go index 29b6de3..a402771 100644 --- a/processes.go +++ b/processes.go @@ -395,7 +395,16 @@ func newStartup(server *server) *startup { func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { er := fmt.Errorf("starting %v subscriber: %#v", m, p.node) p.errorKernel.logDebug(er, p.configuration) - sub := newSubject(m, string(p.node)) + + var sub Subject + switch { + case m == REQErrorLog: + sub = newSubject(m, "errorCentral") + default: + sub = newSubject(m, string(p.node)) + } + + fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub) proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber, nil) proc.procFunc = pf diff --git a/requests_file_handling.go b/requests_file_handling.go index 887d383..73b873e 100644 --- a/requests_file_handling.go +++ b/requests_file_handling.go @@ -20,25 +20,22 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error { // Check the file is a unix socket, and if it is we write the // data to the socket instead of writing it to a normal file. fi, err := os.Stat(file) - if err != nil { - er := fmt.Errorf("error: methodREQToFile/Append failed to stat filepath: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) + if err == nil { + if fi.Mode().Type() == fs.ModeSocket { + // TODO: Write to socket + socket, err := net.Dial("unix", file) + if err != nil { + er := fmt.Errorf("error: methodREQToFile/Append could to open socket file for writing: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) + return er + } + defer socket.Close() - return er - } + _, err = socket.Write([]byte(message.Data)) + if err != nil { + er := fmt.Errorf("error: methodREQToFile/Append could not write to socket: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) + return er + } - if fi.Mode().Type() == fs.ModeSocket { - // TODO: Write to socket - socket, err := net.Dial("unix", file) - if err != nil { - er := fmt.Errorf("error: methodREQToFile/Append could to open socket file for writing: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) - return er - } - defer socket.Close() - - _, err = socket.Write([]byte(message.Data)) - if err != nil { - er := fmt.Errorf("error: methodREQToFile/Append could not write to socket: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err) - return er } } diff --git a/ringbuffer.go b/ringbuffer.go index f051a10..a9a6a77 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -284,6 +284,13 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, ringBufferOutCh // Create a ticker that will kick in when a message have been in the // system for it's maximum time. This will allow us to continue, and // remove the message if it takes longer than it should to get delivered. + fmt.Printf("DEBUG:::%v\n", v.SAM.ACKTimeout) + if v.SAM.ACKTimeout <= 0 { + v.SAM.ACKTimeout = 1 + } + if v.SAM.Retries <= 0 { + v.SAM.Retries = 1 + } ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * time.Second) defer ticker.Stop()