mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
fixed incorrectly error handling when checking socket, set ticket to 1 when 0, send errorLogs to the correct node after rewrite of startups earlier
This commit is contained in:
parent
6922775107
commit
6da53c8ed6
3 changed files with 31 additions and 18 deletions
11
processes.go
11
processes.go
|
@ -395,7 +395,16 @@ func newStartup(server *server) *startup {
|
||||||
func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) {
|
func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) {
|
||||||
er := fmt.Errorf("starting %v subscriber: %#v", m, p.node)
|
er := fmt.Errorf("starting %v subscriber: %#v", m, p.node)
|
||||||
p.errorKernel.logDebug(er, p.configuration)
|
p.errorKernel.logDebug(er, p.configuration)
|
||||||
sub := newSubject(m, string(p.node))
|
|
||||||
|
var sub Subject
|
||||||
|
switch {
|
||||||
|
case m == REQErrorLog:
|
||||||
|
sub = newSubject(m, "errorCentral")
|
||||||
|
default:
|
||||||
|
sub = newSubject(m, string(p.node))
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub)
|
||||||
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber, nil)
|
||||||
proc.procFunc = pf
|
proc.procFunc = pf
|
||||||
|
|
||||||
|
|
|
@ -20,25 +20,22 @@ func reqWriteFileOrSocket(isAppend bool, proc process, message Message) error {
|
||||||
// Check the file is a unix socket, and if it is we write the
|
// Check the file is a unix socket, and if it is we write the
|
||||||
// data to the socket instead of writing it to a normal file.
|
// data to the socket instead of writing it to a normal file.
|
||||||
fi, err := os.Stat(file)
|
fi, err := os.Stat(file)
|
||||||
if err != nil {
|
if err == nil {
|
||||||
er := fmt.Errorf("error: methodREQToFile/Append failed to stat filepath: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
if fi.Mode().Type() == fs.ModeSocket {
|
||||||
|
// TODO: Write to socket
|
||||||
|
socket, err := net.Dial("unix", file)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQToFile/Append could to open socket file for writing: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
||||||
|
return er
|
||||||
|
}
|
||||||
|
defer socket.Close()
|
||||||
|
|
||||||
return er
|
_, err = socket.Write([]byte(message.Data))
|
||||||
}
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQToFile/Append could not write to socket: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
||||||
|
return er
|
||||||
|
}
|
||||||
|
|
||||||
if fi.Mode().Type() == fs.ModeSocket {
|
|
||||||
// TODO: Write to socket
|
|
||||||
socket, err := net.Dial("unix", file)
|
|
||||||
if err != nil {
|
|
||||||
er := fmt.Errorf("error: methodREQToFile/Append could to open socket file for writing: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
|
||||||
return er
|
|
||||||
}
|
|
||||||
defer socket.Close()
|
|
||||||
|
|
||||||
_, err = socket.Write([]byte(message.Data))
|
|
||||||
if err != nil {
|
|
||||||
er := fmt.Errorf("error: methodREQToFile/Append could not write to socket: subject:%v, folderTree: %v, %v", proc.subject, folderTree, err)
|
|
||||||
return er
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -284,6 +284,13 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, ringBufferOutCh
|
||||||
// Create a ticker that will kick in when a message have been in the
|
// Create a ticker that will kick in when a message have been in the
|
||||||
// system for it's maximum time. This will allow us to continue, and
|
// system for it's maximum time. This will allow us to continue, and
|
||||||
// remove the message if it takes longer than it should to get delivered.
|
// remove the message if it takes longer than it should to get delivered.
|
||||||
|
fmt.Printf("DEBUG:::%v\n", v.SAM.ACKTimeout)
|
||||||
|
if v.SAM.ACKTimeout <= 0 {
|
||||||
|
v.SAM.ACKTimeout = 1
|
||||||
|
}
|
||||||
|
if v.SAM.Retries <= 0 {
|
||||||
|
v.SAM.Retries = 1
|
||||||
|
}
|
||||||
ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * time.Second)
|
ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue