diff --git a/configuration_flags.go b/configuration_flags.go index 2c8bb12..3aa27fa 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -106,6 +106,8 @@ type Configuration struct { StartSubREQHello flagNodeSlice // Subscriber for text logging StartSubREQTextToLogFile flagNodeSlice + // Subscriber for writing to file + StartSubREQTextToFile flagNodeSlice // Subscriber for Echo Request StartSubREQPing flagNodeSlice // Subscriber for Echo Reply @@ -116,6 +118,8 @@ type Configuration struct { StartSubREQnCliCommand flagNodeSlice // Subscriber for REQTextToConsole StartSubREQTextToConsole flagNodeSlice + // Subscriber for REQHttpGet + StartSubREQHttpGet flagNodeSlice } // NewConfiguration will set a default Configuration, @@ -140,11 +144,13 @@ func newConfigurationDefaults() Configuration { StartSubREQErrorLog: flagNodeSlice{Values: []node{}}, StartSubREQHello: flagNodeSlice{OK: true, Values: []node{"*"}}, StartSubREQTextToLogFile: flagNodeSlice{OK: true, Values: []node{"*"}}, + StartSubREQTextToFile: flagNodeSlice{OK: true, Values: []node{"*"}}, StartSubREQPing: flagNodeSlice{OK: true, Values: []node{"*"}}, StartSubREQPong: flagNodeSlice{OK: true, Values: []node{"*"}}, StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}}, StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}}, StartSubREQTextToConsole: flagNodeSlice{OK: true, Values: []node{"*"}}, + StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []node{"*"}}, } return c } @@ -180,11 +186,13 @@ func (c *Configuration) CheckFlags() error { flag.Var(&c.StartSubREQErrorLog, "startSubREQErrorLog", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Var(&c.StartSubREQHello, "startSubREQHello", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Var(&c.StartSubREQTextToLogFile, "startSubREQTextToLogFile", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") + flag.Var(&c.StartSubREQTextToFile, "startSubREQTextToFile", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Var(&c.StartSubREQPing, "startSubREQPing", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Var(&c.StartSubREQPong, "startSubREQPong", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Var(&c.StartSubREQCliCommand, "startSubREQCliCommand", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Var(&c.StartSubREQnCliCommand, "startSubREQnCliCommand", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Var(&c.StartSubREQTextToConsole, "startSubREQTextToConsole", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") + flag.Var(&c.StartSubREQHttpGet, "startSubREQHttpGet", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Parse() diff --git a/etc/config.toml b/etc/config.toml index e9d0e8c..1a098b8 100644 --- a/etc/config.toml +++ b/etc/config.toml @@ -21,6 +21,10 @@ SubscribersDataFolder = "./var" OK = true Values = ["*"] +[StartSubREQHttpGet] + OK = true + Values = ["*"] + [StartSubREQPing] OK = true Values = ["*"] @@ -33,6 +37,10 @@ SubscribersDataFolder = "./var" OK = true Values = ["*"] +[StartSubREQTextToFile] + OK = true + Values = ["*"] + [StartSubREQTextToLogFile] OK = true Values = ["*"] diff --git a/example/toShip1-REQHttpGet.json b/example/toShip1-REQHttpGet.json new file mode 100644 index 0000000..bce26b3 --- /dev/null +++ b/example/toShip1-REQHttpGet.json @@ -0,0 +1,11 @@ +[ + { + "label":"metrics/vg.no/annet/minside/", + "toNode": "ship1", + "data": ["http://vg.no"], + "method":"REQHttpGet", + "timeout":10, + "retries":3, + "methodTimeout": 5 + } +] \ No newline at end of file diff --git a/startup_processes.go b/startup_processes.go index 0fd1aa6..6ae1967 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -30,6 +30,17 @@ func (s *server) ProcessesStart() { } } + // Start a subscriber for text to file messages + if s.configuration.StartSubREQTextToFile.OK { + { + fmt.Printf("Starting text to file subscriber: %#v\n", s.nodeName) + sub := newSubject(REQTextToFile, s.nodeName) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToFile.Values, nil) + // fmt.Printf("*** %#v\n", proc) + go proc.spawnWorker(s) + } + } + // Start a subscriber for Hello messages if s.configuration.StartSubREQHello.OK { { @@ -163,4 +174,15 @@ func (s *server) ProcessesStart() { }) go proc.spawnWorker(s) } + + // Start a subscriber for Http Get Requests + if s.configuration.StartSubREQHttpGet.OK { + { + fmt.Printf("Starting Http Get subscriber: %#v\n", s.nodeName) + sub := newSubject(REQHttpGet, s.nodeName) + proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHttpGet.Values, nil) + // fmt.Printf("*** %#v\n", proc) + go proc.spawnWorker(s) + } + } } diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 72e4b53..f71bcd2 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -35,7 +35,9 @@ package steward import ( "context" "fmt" + "io" "log" + "net/http" "os" "os/exec" "path/filepath" @@ -103,6 +105,8 @@ const ( REQPing Method = "REQPing" // Will generate a reply for a ECHORequest REQPong Method = "REQPong" + // Http Get + REQHttpGet Method = "REQHttpGet" ) // The mapping of all the method constants specified, what type @@ -149,6 +153,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQPong: methodREQPong{ commandOrEvent: EventACK, }, + REQHttpGet: methodREQHttpGet{ + commandOrEvent: EventACK, + }, }, } @@ -372,7 +379,7 @@ func (m methodREQTextToFile) handler(proc process, message Message, node string) // Open file and write data. file := filepath.Join(folderTree, fileName) - f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_CREATE, 0755) + f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { log.Printf("error: methodEventTextLogging.handler: failed to open file: %v\n", err) return nil, err @@ -651,3 +658,79 @@ func (m methodREQTextToConsole) handler(proc process, message Message, node stri ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } + +// --- + +type methodREQHttpGet struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQHttpGet) getKind() CommandOrEvent { + return m.commandOrEvent +} + +// handler to run a Http Get. +func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) { + log.Printf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data) + + go func() { + url := message.Data[0] + + client := http.Client{ + Timeout: time.Second * 5, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + er := fmt.Errorf("error: NewRequest failed: %v, bailing out: %v", err, message) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + cancel() + return + } + + outCh := make(chan []byte) + + go func() { + resp, err := client.Do(req) + if err != nil { + er := fmt.Errorf("error: client.Do failed: %v, bailing out: %v", err, message) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + cancel() + er := fmt.Errorf("error: not 200, where %#v, bailing out: %v", resp.StatusCode, message) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return + } + + b, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("error: ReadAll failed: %v\n", err) + } + + outCh <- b + }() + + select { + case <-ctx.Done(): + cancel() + er := fmt.Errorf("error: method timed out %v", message) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + case out := <-outCh: + cancel() + + // Prepare and queue for sending a new message with the output + // of the action executed. + newReplyMessage(proc, message, REQTextToFile, out) + } + + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +}