1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

added REQHttpGetScheduled

This commit is contained in:
postmannen 2022-02-11 07:27:51 +01:00
parent fbeb3bf9f6
commit b11b8d6baf
4 changed files with 213 additions and 30 deletions

View file

@ -5,6 +5,8 @@ type argsString string
type centralAuth struct {
schema map[Node]map[argsString]signatureBase32
configuration *Configuration
}
func newCentralAuth() *centralAuth {

View file

@ -105,6 +105,8 @@ type Configuration struct {
StartSubREQToConsole bool
// Subscriber for REQHttpGet
StartSubREQHttpGet bool
// Subscriber for REQHttpGetScheduled
StartSubREQHttpGetScheduled bool
// Subscriber for tailing log files
StartSubREQTailFile bool
// Subscriber for continously delivery of output from cli commands.
@ -150,21 +152,22 @@ type ConfigurationFromFile struct {
EnableSignatureCheck *bool
IsCentralAuth *bool
StartPubREQHello *int
StartSubREQErrorLog *bool
StartSubREQHello *bool
StartSubREQToFileAppend *bool
StartSubREQToFile *bool
StartSubREQCopyFileFrom *bool
StartSubREQCopyFileTo *bool
StartSubREQPing *bool
StartSubREQPong *bool
StartSubREQCliCommand *bool
StartSubREQToConsole *bool
StartSubREQHttpGet *bool
StartSubREQTailFile *bool
StartSubREQCliCommandCont *bool
StartSubREQRelay *bool
StartPubREQHello *int
StartSubREQErrorLog *bool
StartSubREQHello *bool
StartSubREQToFileAppend *bool
StartSubREQToFile *bool
StartSubREQCopyFileFrom *bool
StartSubREQCopyFileTo *bool
StartSubREQPing *bool
StartSubREQPong *bool
StartSubREQCliCommand *bool
StartSubREQToConsole *bool
StartSubREQHttpGet *bool
StartSubREQHttpGetScheduled *bool
StartSubREQTailFile *bool
StartSubREQCliCommandCont *bool
StartSubREQRelay *bool
}
// NewConfiguration will return a *Configuration.
@ -208,20 +211,21 @@ func newConfigurationDefaults() Configuration {
EnableSignatureCheck: false,
IsCentralAuth: false,
StartSubREQErrorLog: true,
StartSubREQHello: true,
StartSubREQToFileAppend: true,
StartSubREQToFile: true,
StartSubREQCopyFileFrom: true,
StartSubREQCopyFileTo: true,
StartSubREQPing: true,
StartSubREQPong: true,
StartSubREQCliCommand: true,
StartSubREQToConsole: true,
StartSubREQHttpGet: true,
StartSubREQTailFile: true,
StartSubREQCliCommandCont: true,
StartSubREQRelay: false,
StartSubREQErrorLog: true,
StartSubREQHello: true,
StartSubREQToFileAppend: true,
StartSubREQToFile: true,
StartSubREQCopyFileFrom: true,
StartSubREQCopyFileTo: true,
StartSubREQPing: true,
StartSubREQPong: true,
StartSubREQCliCommand: true,
StartSubREQToConsole: true,
StartSubREQHttpGet: true,
StartSubREQHttpGetScheduled: true,
StartSubREQTailFile: true,
StartSubREQCliCommandCont: true,
StartSubREQRelay: false,
}
return c
}
@ -450,6 +454,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else {
conf.StartSubREQHttpGet = *cf.StartSubREQHttpGet
}
if cf.StartSubREQHttpGetScheduled == nil {
conf.StartSubREQHttpGetScheduled = cd.StartSubREQHttpGetScheduled
} else {
conf.StartSubREQHttpGetScheduled = *cf.StartSubREQHttpGetScheduled
}
if cf.StartSubREQTailFile == nil {
conf.StartSubREQTailFile = cd.StartSubREQTailFile
} else {
@ -542,6 +551,7 @@ func (c *Configuration) CheckFlags() error {
flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", fc.StartSubREQCliCommand, "true/false")
flag.BoolVar(&c.StartSubREQToConsole, "startSubREQToConsole", fc.StartSubREQToConsole, "true/false")
flag.BoolVar(&c.StartSubREQHttpGet, "startSubREQHttpGet", fc.StartSubREQHttpGet, "true/false")
flag.BoolVar(&c.StartSubREQHttpGetScheduled, "startSubREQHttpGetScheduled", fc.StartSubREQHttpGetScheduled, "true/false")
flag.BoolVar(&c.StartSubREQTailFile, "startSubREQTailFile", fc.StartSubREQTailFile, "true/false")
flag.BoolVar(&c.StartSubREQCliCommandCont, "startSubREQCliCommandCont", fc.StartSubREQCliCommandCont, "true/false")
flag.BoolVar(&c.StartSubREQRelay, "startSubREQRelay", fc.StartSubREQRelay, "true/false")

View file

@ -176,6 +176,10 @@ func (p *processes) Start(proc process) {
proc.startup.subREQHttpGet(proc)
}
if proc.configuration.StartSubREQHttpGetScheduled {
proc.startup.subREQHttpGetScheduled(proc)
}
if proc.configuration.StartSubREQTailFile {
proc.startup.subREQTailFile(proc)
}
@ -228,6 +232,16 @@ func (s startup) subREQHttpGet(p process) {
}
func (s startup) subREQHttpGetScheduled(p process) {
log.Printf("Starting Http Get Scheduled subscriber: %#v\n", p.node)
sub := newSubject(REQHttpGetScheduled, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, s.Signatures)
go proc.spawnWorker(p.processes, p.natsConn)
}
func (s startup) pubREQHello(p process) {
log.Printf("Starting Hello Publisher: %#v\n", p.node)

View file

@ -44,6 +44,7 @@ import (
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -118,6 +119,9 @@ const (
REQPong Method = "REQPong"
// Http Get
REQHttpGet Method = "REQHttpGet"
// Http Get Scheduled
// The second element of the MethodArgs slice holds the timer defined in seconds.
REQHttpGetScheduled Method = "REQHttpGetScheduled"
// Tail file
REQTailFile Method = "REQTailFile"
// Write to steward socket
@ -193,6 +197,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
REQHttpGet: methodREQHttpGet{
event: EventACK,
},
REQHttpGetScheduled: methodREQHttpGetScheduled{
event: EventACK,
},
REQTailFile: methodREQTailFile{
event: EventACK,
},
@ -1391,7 +1398,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
if resp.StatusCode != 200 {
cancel()
er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message)
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, bailing out: %v", resp.StatusCode, message)
proc.processes.errorKernel.errSend(proc, message, er)
return
}
@ -1431,6 +1438,156 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
return ackMsg, nil
}
// ---
type methodREQHttpGetScheduled struct {
event Event
}
func (m methodREQHttpGetScheduled) getKind() Event {
return m.event
}
// handler to do a Http Get Scheduled.
// The second element of the MethodArgs slice holds the timer defined in seconds.
func (m methodREQHttpGetScheduled) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// --- Check and prepare the methodArgs
switch {
case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for")
proc.processes.errorKernel.errSend(proc, message, er)
log.Printf("%v\n", er)
return
}
url := message.MethodArgs[0]
scheduleInterval, err := strconv.Atoi(message.MethodArgs[1])
if err != nil {
er := fmt.Errorf("error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er)
return
}
// schedulerTotalTime, err := strconv.Atoi(message.MethodArgs[2])
if err != nil {
er := fmt.Errorf("error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er)
return
}
// --- Prepare and start the scheduler.
outCh := make(chan []byte)
ticker := time.NewTicker(time.Second * time.Duration(scheduleInterval))
// Prepare a context that will be for the schedule as a whole.
// NB: Individual http get's will create their own context's
// derived from this one.
ctxScheduler, cancel := context.WithTimeout(proc.ctx, time.Second*60)
go func() {
// Prepare the http request.
client := http.Client{
Timeout: time.Second * time.Duration(message.MethodTimeout),
}
for {
select {
case <-ticker.C:
proc.processes.wg.Add(1)
// Get a context with the timeout specified in message.MethodTimeout
// for the individual http requests.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, error: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er)
cancel()
return
}
// Run each individual http get in it's own go routine, and
// deliver the result on the outCh.
go func() {
defer proc.processes.wg.Done()
resp, err := client.Do(req)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, error: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er)
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
cancel()
er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, error: %v", resp.StatusCode, message)
proc.processes.errorKernel.errSend(proc, message, er)
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er)
log.Printf("%v\n", er)
}
out := body
select {
case outCh <- out:
case <-ctx.Done():
return
case <-ctxScheduler.Done():
// If the scheduler context is done then we also want to kill
// all running http request.
cancel()
return
}
}()
case <-ctxScheduler.Done():
cancel()
return
}
}
}()
for {
select {
case <-ctxScheduler.Done():
fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n")
cancel()
er := fmt.Errorf("error: methodREQHttpGet: schedule context timed out: %v", message.MethodArgs)
proc.processes.errorKernel.errSend(proc, message, er)
return
case out := <-outCh:
// Prepare and queue for sending a new message with the output
// of the action executed.
newReplyMessage(proc, message, out)
}
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- methodREQTailFile
type methodREQTailFile struct {