diff --git a/central_auth.go b/central_auth.go index 92aeb9d..e57e7fb 100644 --- a/central_auth.go +++ b/central_auth.go @@ -5,6 +5,8 @@ type argsString string type centralAuth struct { schema map[Node]map[argsString]signatureBase32 + + configuration *Configuration } func newCentralAuth() *centralAuth { diff --git a/configuration_flags.go b/configuration_flags.go index 5c03f49..7f13b07 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -105,6 +105,8 @@ type Configuration struct { StartSubREQToConsole bool // Subscriber for REQHttpGet StartSubREQHttpGet bool + // Subscriber for REQHttpGetScheduled + StartSubREQHttpGetScheduled bool // Subscriber for tailing log files StartSubREQTailFile bool // Subscriber for continously delivery of output from cli commands. @@ -150,21 +152,22 @@ type ConfigurationFromFile struct { EnableSignatureCheck *bool IsCentralAuth *bool - StartPubREQHello *int - StartSubREQErrorLog *bool - StartSubREQHello *bool - StartSubREQToFileAppend *bool - StartSubREQToFile *bool - StartSubREQCopyFileFrom *bool - StartSubREQCopyFileTo *bool - StartSubREQPing *bool - StartSubREQPong *bool - StartSubREQCliCommand *bool - StartSubREQToConsole *bool - StartSubREQHttpGet *bool - StartSubREQTailFile *bool - StartSubREQCliCommandCont *bool - StartSubREQRelay *bool + StartPubREQHello *int + StartSubREQErrorLog *bool + StartSubREQHello *bool + StartSubREQToFileAppend *bool + StartSubREQToFile *bool + StartSubREQCopyFileFrom *bool + StartSubREQCopyFileTo *bool + StartSubREQPing *bool + StartSubREQPong *bool + StartSubREQCliCommand *bool + StartSubREQToConsole *bool + StartSubREQHttpGet *bool + StartSubREQHttpGetScheduled *bool + StartSubREQTailFile *bool + StartSubREQCliCommandCont *bool + StartSubREQRelay *bool } // NewConfiguration will return a *Configuration. @@ -208,20 +211,21 @@ func newConfigurationDefaults() Configuration { EnableSignatureCheck: false, IsCentralAuth: false, - StartSubREQErrorLog: true, - StartSubREQHello: true, - StartSubREQToFileAppend: true, - StartSubREQToFile: true, - StartSubREQCopyFileFrom: true, - StartSubREQCopyFileTo: true, - StartSubREQPing: true, - StartSubREQPong: true, - StartSubREQCliCommand: true, - StartSubREQToConsole: true, - StartSubREQHttpGet: true, - StartSubREQTailFile: true, - StartSubREQCliCommandCont: true, - StartSubREQRelay: false, + StartSubREQErrorLog: true, + StartSubREQHello: true, + StartSubREQToFileAppend: true, + StartSubREQToFile: true, + StartSubREQCopyFileFrom: true, + StartSubREQCopyFileTo: true, + StartSubREQPing: true, + StartSubREQPong: true, + StartSubREQCliCommand: true, + StartSubREQToConsole: true, + StartSubREQHttpGet: true, + StartSubREQHttpGetScheduled: true, + StartSubREQTailFile: true, + StartSubREQCliCommandCont: true, + StartSubREQRelay: false, } return c } @@ -450,6 +454,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.StartSubREQHttpGet = *cf.StartSubREQHttpGet } + if cf.StartSubREQHttpGetScheduled == nil { + conf.StartSubREQHttpGetScheduled = cd.StartSubREQHttpGetScheduled + } else { + conf.StartSubREQHttpGetScheduled = *cf.StartSubREQHttpGetScheduled + } if cf.StartSubREQTailFile == nil { conf.StartSubREQTailFile = cd.StartSubREQTailFile } else { @@ -542,6 +551,7 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", fc.StartSubREQCliCommand, "true/false") flag.BoolVar(&c.StartSubREQToConsole, "startSubREQToConsole", fc.StartSubREQToConsole, "true/false") flag.BoolVar(&c.StartSubREQHttpGet, "startSubREQHttpGet", fc.StartSubREQHttpGet, "true/false") + flag.BoolVar(&c.StartSubREQHttpGetScheduled, "startSubREQHttpGetScheduled", fc.StartSubREQHttpGetScheduled, "true/false") flag.BoolVar(&c.StartSubREQTailFile, "startSubREQTailFile", fc.StartSubREQTailFile, "true/false") flag.BoolVar(&c.StartSubREQCliCommandCont, "startSubREQCliCommandCont", fc.StartSubREQCliCommandCont, "true/false") flag.BoolVar(&c.StartSubREQRelay, "startSubREQRelay", fc.StartSubREQRelay, "true/false") diff --git a/processes.go b/processes.go index 4e5581e..a6b26c4 100644 --- a/processes.go +++ b/processes.go @@ -176,6 +176,10 @@ func (p *processes) Start(proc process) { proc.startup.subREQHttpGet(proc) } + if proc.configuration.StartSubREQHttpGetScheduled { + proc.startup.subREQHttpGetScheduled(proc) + } + if proc.configuration.StartSubREQTailFile { proc.startup.subREQTailFile(proc) } @@ -228,6 +232,16 @@ func (s startup) subREQHttpGet(p process) { } +func (s startup) subREQHttpGetScheduled(p process) { + + log.Printf("Starting Http Get Scheduled subscriber: %#v\n", p.node) + sub := newSubject(REQHttpGetScheduled, string(p.node)) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, s.Signatures) + + go proc.spawnWorker(p.processes, p.natsConn) + +} + func (s startup) pubREQHello(p process) { log.Printf("Starting Hello Publisher: %#v\n", p.node) diff --git a/requests.go b/requests.go index 6b45f6c..1d17a70 100644 --- a/requests.go +++ b/requests.go @@ -44,6 +44,7 @@ import ( "os/exec" "path" "path/filepath" + "strconv" "strings" "sync" "time" @@ -118,6 +119,9 @@ const ( REQPong Method = "REQPong" // Http Get REQHttpGet Method = "REQHttpGet" + // Http Get Scheduled + // The second element of the MethodArgs slice holds the timer defined in seconds. + REQHttpGetScheduled Method = "REQHttpGetScheduled" // Tail file REQTailFile Method = "REQTailFile" // Write to steward socket @@ -193,6 +197,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQHttpGet: methodREQHttpGet{ event: EventACK, }, + REQHttpGetScheduled: methodREQHttpGetScheduled{ + event: EventACK, + }, REQTailFile: methodREQTailFile{ event: EventACK, }, @@ -1391,7 +1398,7 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ if resp.StatusCode != 200 { cancel() - er := fmt.Errorf("error: methodREQHttpGet: not 200, where %#v, bailing out: %v", resp.StatusCode, message) + er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, bailing out: %v", resp.StatusCode, message) proc.processes.errorKernel.errSend(proc, message, er) return } @@ -1431,6 +1438,156 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ return ackMsg, nil } +// --- + +type methodREQHttpGetScheduled struct { + event Event +} + +func (m methodREQHttpGetScheduled) getKind() Event { + return m.event +} + +// handler to do a Http Get Scheduled. +// The second element of the MethodArgs slice holds the timer defined in seconds. +func (m methodREQHttpGetScheduled) handler(proc process, message Message, node string) ([]byte, error) { + log.Printf("<--- REQHttpGetScheduled received from: %v, containing: %v", message.FromNode, message.Data) + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + + // --- Check and prepare the methodArgs + + switch { + case len(message.MethodArgs) < 3: + er := fmt.Errorf("error: methodREQHttpGet: got <3 number methodArgs. Want URL, Schedule Interval in seconds, and the total time in minutes the scheduler should run for") + proc.processes.errorKernel.errSend(proc, message, er) + log.Printf("%v\n", er) + return + } + + url := message.MethodArgs[0] + + scheduleInterval, err := strconv.Atoi(message.MethodArgs[1]) + if err != nil { + er := fmt.Errorf("error: methodREQHttpGetScheduled: schedule interval value is not a valid int number defined as a string value seconds: %v, bailing out: %v", err, message.MethodArgs) + proc.processes.errorKernel.errSend(proc, message, er) + return + } + + // schedulerTotalTime, err := strconv.Atoi(message.MethodArgs[2]) + if err != nil { + er := fmt.Errorf("error: methodREQHttpGetScheduled: scheduler total time value is not a valid int number defined as a string value minutes: %v, bailing out: %v", err, message.MethodArgs) + proc.processes.errorKernel.errSend(proc, message, er) + return + } + + // --- Prepare and start the scheduler. + + outCh := make(chan []byte) + + ticker := time.NewTicker(time.Second * time.Duration(scheduleInterval)) + + // Prepare a context that will be for the schedule as a whole. + // NB: Individual http get's will create their own context's + // derived from this one. + ctxScheduler, cancel := context.WithTimeout(proc.ctx, time.Second*60) + + go func() { + // Prepare the http request. + client := http.Client{ + Timeout: time.Second * time.Duration(message.MethodTimeout), + } + + for { + + select { + case <-ticker.C: + proc.processes.wg.Add(1) + + // Get a context with the timeout specified in message.MethodTimeout + // for the individual http requests. + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + er := fmt.Errorf("error: methodREQHttpGet: NewRequest failed: %v, error: %v", err, message.MethodArgs) + proc.processes.errorKernel.errSend(proc, message, er) + cancel() + return + } + + // Run each individual http get in it's own go routine, and + // deliver the result on the outCh. + go func() { + defer proc.processes.wg.Done() + + resp, err := client.Do(req) + if err != nil { + er := fmt.Errorf("error: methodREQHttpGet: client.Do failed: %v, error: %v", err, message.MethodArgs) + proc.processes.errorKernel.errSend(proc, message, er) + return + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + cancel() + er := fmt.Errorf("error: methodREQHttpGet: not 200, were %#v, error: %v", resp.StatusCode, message) + proc.processes.errorKernel.errSend(proc, message, er) + return + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + er := fmt.Errorf("error: methodREQHttpGet: io.ReadAll failed : %v, methodArgs: %v", err, message.MethodArgs) + proc.processes.errorKernel.errSend(proc, message, er) + log.Printf("%v\n", er) + } + + out := body + + select { + case outCh <- out: + case <-ctx.Done(): + return + case <-ctxScheduler.Done(): + // If the scheduler context is done then we also want to kill + // all running http request. + cancel() + return + } + }() + + case <-ctxScheduler.Done(): + cancel() + return + + } + } + }() + + for { + select { + case <-ctxScheduler.Done(): + fmt.Printf(" * DEBUG: <-ctxScheduler.Done()\n") + cancel() + er := fmt.Errorf("error: methodREQHttpGet: schedule context timed out: %v", message.MethodArgs) + proc.processes.errorKernel.errSend(proc, message, er) + return + case out := <-outCh: + // Prepare and queue for sending a new message with the output + // of the action executed. + newReplyMessage(proc, message, out) + } + } + + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + // --- methodREQTailFile type methodREQTailFile struct {