2022-05-19 18:54:33 +00:00
package steward
import (
"context"
"fmt"
"io"
"net/http"
"strconv"
"time"
)
type methodREQHttpGet struct {
event Event
}
func ( m methodREQHttpGet ) getKind ( ) Event {
return m . event
}
// handler to do a Http Get.
func ( m methodREQHttpGet ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
inf := fmt . Errorf ( "<--- REQHttpGet received from: %v, containing: %v" , message . FromNode , message . Data )
proc . errorKernel . logConsoleOnlyIfDebug ( inf , proc . configuration )
2023-01-04 11:27:58 +00:00
msgForErrors := message
msgForErrors . FileName = msgForErrors . FileName + ".error"
2022-05-19 18:54:33 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
switch {
case len ( message . MethodArgs ) < 1 :
er := fmt . Errorf ( "error: methodREQHttpGet: got <1 number methodArgs" )
proc . errorKernel . errSend ( proc , message , er )
2023-01-04 11:27:58 +00:00
newReplyMessage ( proc , msgForErrors , [ ] byte ( er . Error ( ) ) )
2022-05-19 18:54:33 +00:00
return
}
url := message . MethodArgs [ 0 ]
client := http . Client {
Timeout : time . Second * time . Duration ( message . MethodTimeout ) ,
}
// Get a context with the timeout specified in message.MethodTimeout.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , url , nil )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGet: NewRequest failed: %v, bailing out: %v" , err , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
2023-01-04 11:27:58 +00:00
newReplyMessage ( proc , msgForErrors , [ ] byte ( er . Error ( ) ) )
2022-05-19 18:54:33 +00:00
cancel ( )
return
}
outCh := make ( chan [ ] byte )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
resp , err := client . Do ( req )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGet: client.Do failed: %v, bailing out: %v" , err , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
2023-01-04 11:27:58 +00:00
newReplyMessage ( proc , msgForErrors , [ ] byte ( er . Error ( ) ) )
2022-05-19 18:54:33 +00:00
return
}
defer resp . Body . Close ( )
if resp . StatusCode != 200 {
cancel ( )
er := fmt . Errorf ( "error: methodREQHttpGet: not 200, were %#v, bailing out: %v" , resp . StatusCode , message )
proc . errorKernel . errSend ( proc , message , er )
2023-01-04 11:27:58 +00:00
newReplyMessage ( proc , msgForErrors , [ ] byte ( er . Error ( ) ) )
2022-05-19 18:54:33 +00:00
return
}
body , err := io . ReadAll ( resp . Body )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v" , err , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
2023-01-04 11:27:58 +00:00
newReplyMessage ( proc , msgForErrors , [ ] byte ( er . Error ( ) ) )
2022-05-19 18:54:33 +00:00
}
out := body
select {
case outCh <- out :
case <- ctx . Done ( ) :
return
}
} ( )
select {
case <- ctx . Done ( ) :
cancel ( )
er := fmt . Errorf ( "error: methodREQHttpGet: method timed out: %v" , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
2023-01-04 11:27:58 +00:00
newReplyMessage ( proc , msgForErrors , [ ] byte ( er . Error ( ) ) )
2022-05-19 18:54:33 +00:00
case out := <- outCh :
cancel ( )
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage ( proc , message , out )
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
// ---
type methodREQHttpGetScheduled struct {
event Event
}
func ( m methodREQHttpGetScheduled ) getKind ( ) Event {
return m . event
}
// handler to do a Http Get Scheduled.
// The second element of the MethodArgs slice holds the timer defined in seconds.
func ( m methodREQHttpGetScheduled ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
inf := fmt . Errorf ( "<--- REQHttpGetScheduled received from: %v, containing: %v" , message . FromNode , message . Data )
proc . errorKernel . logConsoleOnlyIfDebug ( inf , proc . configuration )
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
// --- Check and prepare the methodArgs
switch {
case len ( message . MethodArgs ) < 3 :
er := fmt . Errorf ( "error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for" )
proc . errorKernel . errSend ( proc , message , er )
return
}
url := message . MethodArgs [ 0 ]
scheduleInterval , err := strconv . Atoi ( message . MethodArgs [ 1 ] )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v" , err , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
return
}
schedulerTotalTime , err := strconv . Atoi ( message . MethodArgs [ 2 ] )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v" , err , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
return
}
// --- Prepare and start the scheduler.
outCh := make ( chan [ ] byte )
ticker := time . NewTicker ( time . Second * time . Duration ( scheduleInterval ) )
// Prepare a context that will be for the schedule as a whole.
// NB: Individual http get's will create their own context's
// derived from this one.
ctxScheduler , cancel := context . WithTimeout ( proc . ctx , time . Minute * time . Duration ( schedulerTotalTime ) )
go func ( ) {
// Prepare the http request.
client := http . Client {
Timeout : time . Second * time . Duration ( message . MethodTimeout ) ,
}
for {
select {
case <- ticker . C :
proc . processes . wg . Add ( 1 )
// Get a context with the timeout specified in message.MethodTimeout
// for the individual http requests.
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , url , nil )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGet: NewRequest failed: %v, error: %v" , err , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
cancel ( )
return
}
// Run each individual http get in it's own go routine, and
// deliver the result on the outCh.
go func ( ) {
defer proc . processes . wg . Done ( )
resp , err := client . Do ( req )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGet: client.Do failed: %v, error: %v" , err , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
return
}
defer resp . Body . Close ( )
if resp . StatusCode != 200 {
cancel ( )
er := fmt . Errorf ( "error: methodREQHttpGet: not 200, were %#v, error: %v" , resp . StatusCode , message )
proc . errorKernel . errSend ( proc , message , er )
return
}
body , err := io . ReadAll ( resp . Body )
if err != nil {
er := fmt . Errorf ( "error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v" , err , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
}
out := body
select {
case outCh <- out :
case <- ctx . Done ( ) :
return
case <- ctxScheduler . Done ( ) :
// If the scheduler context is done then we also want to kill
// all running http request.
cancel ( )
return
}
} ( )
case <- ctxScheduler . Done ( ) :
cancel ( )
return
}
}
} ( )
for {
select {
case <- ctxScheduler . Done ( ) :
// fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n")
cancel ( )
er := fmt . Errorf ( "error: methodREQHttpGet: schedule context timed out: %v" , message . MethodArgs )
proc . errorKernel . errSend ( proc , message , er )
return
case out := <- outCh :
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage ( proc , message , out )
}
}
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}