1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

implemented wg for all handler go routines

This commit is contained in:
postmannen 2021-08-12 12:27:47 +02:00
parent 3fb9aefd78
commit e769546509
2 changed files with 70 additions and 7 deletions

View file

@ -142,7 +142,12 @@ func (p *processes) Start(proc process) {
// Stop all subscriber processes.
func (p *processes) Stop() {
log.Printf("info: canceling all subscriber processes...\n")
fmt.Printf("* DEBUG: %v\n", p.wg)
p.cancel()
p.wg.Wait()
log.Printf("info: done canceling all subscriber processes.\n")
}
// ---------------------------------------------------------------------------------------

View file

@ -282,7 +282,10 @@ type OpCmdStopProc struct {
// return the output of the command run back to the calling publisher
// in the ack message.
func (m methodREQOpCommand) handler(proc process, message Message, nodeName string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
out := []byte{}
// unmarshal the json.RawMessage field called OpArgs.
@ -699,7 +702,7 @@ func (m methodREQErrorLog) handler(proc process, message Message, node string) (
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, os.ModeAppend)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil {
log.Printf("error: methodEventTextLogging.handler: failed to open file: %v\n", err)
return nil, err
@ -733,7 +736,9 @@ func (m methodREQPing) getKind() CommandOrEvent {
func (m methodREQPing) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- PING REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// Prepare and queue for sending a new message with the output
// of the action executed.
d := fmt.Sprintf("%v, ping reply sent from %v\n", time.Now().UTC(), message.ToNode)
@ -781,7 +786,10 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
// to return immediately with an ack reply that the messag was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
c := message.Data[0]
a := message.Data[1:]
@ -789,7 +797,10 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
cmd := exec.CommandContext(ctx, c, a...)
out, err := cmd.Output()
if err != nil {
@ -797,7 +808,11 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
outCh <- out
select {
case outCh <- out:
case <-ctx.Done():
return
}
}()
select {
@ -842,7 +857,10 @@ func (m methodREQnCliCommand) handler(proc process, message Message, node string
// to return immediately with an ack reply that the messag was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
c := message.Data[0]
a := message.Data[1:]
@ -850,7 +868,10 @@ func (m methodREQnCliCommand) handler(proc process, message Message, node string
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
cmd := exec.CommandContext(ctx, c, a...)
out, err := cmd.Output()
if err != nil {
@ -858,7 +879,12 @@ func (m methodREQnCliCommand) handler(proc process, message Message, node string
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
outCh <- out
select {
case outCh <- out:
case <-ctx.Done():
return
}
}()
select {
@ -911,7 +937,10 @@ func (m methodREQHttpGet) getKind() CommandOrEvent {
func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
url := message.Data[0]
client := http.Client{
@ -930,7 +959,10 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
resp, err := client.Do(req)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message)
@ -946,14 +978,18 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
return
}
b, err := io.ReadAll(resp.Body)
out, err := io.ReadAll(resp.Body)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
outCh <- b
select {
case outCh <- out:
case <-ctx.Done():
return
}
}()
select {
@ -991,7 +1027,10 @@ func (m methodREQTailFile) getKind() CommandOrEvent {
func (m methodREQTailFile) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
fp := message.Data[0]
var ctx context.Context
@ -1014,9 +1053,17 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
}
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
for line := range t.Lines {
outCh <- []byte(line.Text + "\n")
select {
case outCh <- []byte(line.Text + "\n"):
case <-ctx.Done():
return
}
}
}()
@ -1066,7 +1113,10 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
// to return immediately with an ack reply that the message was
// received, and we create a new message to send back to the calling
// node for the out put of the actual command.
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
c := message.Data[0]
a := message.Data[1:]
@ -1074,7 +1124,10 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
proc.processes.wg.Done()
cmd := exec.CommandContext(ctx, c, a...)
// Using cmd.StdoutPipe here so we are continuosly
@ -1097,7 +1150,12 @@ func (m methodREQnCliCommandCont) handler(proc process, message Message, node st
scanner := bufio.NewScanner(outReader)
for scanner.Scan() {
outCh <- []byte(scanner.Text() + "\n")
select {
case outCh <- []byte(scanner.Text() + "\n"):
case <-ctx.Done():
return
}
}
}()