1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00
ctrl/requests_http.go
postmannen 2fb43591ce updated readme
removed example for no longer existing relay messages
cleaned up comments
Removed some remaings after REQToFileNACK
Implemented env variables for all flags, and removed config flag. Also added use of .env file.
removed configuration as input argument from all the loggers
replaced logging of new messages in read folder with a logDebug so we don't send those messages to the error kernel
2024-03-27 12:48:17 +01:00

250 lines
7.3 KiB
Go

package ctrl
import (
"context"
"fmt"
"io"
"net/http"
"strconv"
"time"
)
// handler to do a Http Get.
func methodREQHttpGet(proc process, message Message, node string) ([]byte, error) {
er := fmt.Errorf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
proc.errorKernel.logDebug(er)
msgForErrors := message
msgForErrors.FileName = msgForErrors.FileName + ".error"
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
switch {
case len(message.MethodArgs) < 1:
er := fmt.Errorf("error: methodREQHttpGet: got <1 number methodArgs")
proc.errorKernel.errSend(proc, message, er, logWarning)
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
return
}
url := message.MethodArgs[0]
client := http.Client{
Timeout: time.Second * time.Duration(message.MethodTimeout),
}
// Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v", err, message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
cancel()
return
}
outCh := make(chan []byte)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
resp, err := client.Do(req)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, bailing out: %v", err, message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
cancel()
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, bailing out: %v", resp.StatusCode, message)
proc.errorKernel.errSend(proc, message, er, logWarning)
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
}
out := body
select {
case outCh <- out:
case <-ctx.Done():
return
}
}()
select {
case <-ctx.Done():
cancel()
er := fmt.Errorf("error: methodREQHttpGet: method timed out: %v", message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
newReplyMessage(proc, msgForErrors, []byte(er.Error()))
case out := <-outCh:
cancel()
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
// handler to do a Http Get Scheduled.
// The second element of the MethodArgs slice holds the timer defined in seconds.
func methodREQHttpGetScheduled(proc process, message Message, node string) ([]byte, error) {
er := fmt.Errorf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data)
proc.errorKernel.logDebug(er)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// --- Check and prepare the methodArgs
switch {
case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for")
proc.errorKernel.errSend(proc, message, er, logWarning)
return
}
url := message.MethodArgs[0]
scheduleInterval, err := strconv.Atoi(message.MethodArgs[1])
if err != nil {
er := fmt.Errorf("error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v", err, message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
return
}
schedulerTotalTime, err := strconv.Atoi(message.MethodArgs[2])
if err != nil {
er := fmt.Errorf("error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v", err, message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
return
}
// --- Prepare and start the scheduler.
outCh := make(chan []byte)
ticker := time.NewTicker(time.Second * time.Duration(scheduleInterval))
// Prepare a context that will be for the schedule as a whole.
// Individual http get's will create their own context's
// derived from this one.
ctxScheduler, cancel := context.WithTimeout(proc.ctx, time.Minute*time.Duration(schedulerTotalTime))
go func() {
// Prepare the http request.
client := http.Client{
Timeout: time.Second * time.Duration(message.MethodTimeout),
}
for {
select {
case <-ticker.C:
proc.processes.wg.Add(1)
// Get a context with the timeout specified in message.MethodTimeout
// for the individual http requests.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, error: %v", err, message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
cancel()
return
}
// Run each individual http get in it's own go routine, and
// deliver the result on the outCh.
go func() {
defer proc.processes.wg.Done()
resp, err := client.Do(req)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, error: %v", err, message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
cancel()
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, error: %v", resp.StatusCode, message)
proc.errorKernel.errSend(proc, message, er, logWarning)
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
}
out := body
select {
case outCh <- out:
case <-ctx.Done():
return
case <-ctxScheduler.Done():
// If the scheduler context is done then we also want to kill
// all running http request.
cancel()
return
}
}()
case <-ctxScheduler.Done():
cancel()
return
}
}
}()
for {
select {
case <-ctxScheduler.Done():
// fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n")
cancel()
er := fmt.Errorf("error: methodREQHttpGet: schedule context timed out: %v", message.MethodArgs)
proc.errorKernel.errSend(proc, message, er, logWarning)
return
case out := <-outCh:
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}