mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
Initial http get method implemented
This commit is contained in:
parent
685c0c7844
commit
ef8fcee368
5 changed files with 133 additions and 1 deletions
|
@ -106,6 +106,8 @@ type Configuration struct {
|
||||||
StartSubREQHello flagNodeSlice
|
StartSubREQHello flagNodeSlice
|
||||||
// Subscriber for text logging
|
// Subscriber for text logging
|
||||||
StartSubREQTextToLogFile flagNodeSlice
|
StartSubREQTextToLogFile flagNodeSlice
|
||||||
|
// Subscriber for writing to file
|
||||||
|
StartSubREQTextToFile flagNodeSlice
|
||||||
// Subscriber for Echo Request
|
// Subscriber for Echo Request
|
||||||
StartSubREQPing flagNodeSlice
|
StartSubREQPing flagNodeSlice
|
||||||
// Subscriber for Echo Reply
|
// Subscriber for Echo Reply
|
||||||
|
@ -116,6 +118,8 @@ type Configuration struct {
|
||||||
StartSubREQnCliCommand flagNodeSlice
|
StartSubREQnCliCommand flagNodeSlice
|
||||||
// Subscriber for REQTextToConsole
|
// Subscriber for REQTextToConsole
|
||||||
StartSubREQTextToConsole flagNodeSlice
|
StartSubREQTextToConsole flagNodeSlice
|
||||||
|
// Subscriber for REQHttpGet
|
||||||
|
StartSubREQHttpGet flagNodeSlice
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfiguration will set a default Configuration,
|
// NewConfiguration will set a default Configuration,
|
||||||
|
@ -140,11 +144,13 @@ func newConfigurationDefaults() Configuration {
|
||||||
StartSubREQErrorLog: flagNodeSlice{Values: []node{}},
|
StartSubREQErrorLog: flagNodeSlice{Values: []node{}},
|
||||||
StartSubREQHello: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQHello: flagNodeSlice{OK: true, Values: []node{"*"}},
|
||||||
StartSubREQTextToLogFile: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQTextToLogFile: flagNodeSlice{OK: true, Values: []node{"*"}},
|
||||||
|
StartSubREQTextToFile: flagNodeSlice{OK: true, Values: []node{"*"}},
|
||||||
StartSubREQPing: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQPing: flagNodeSlice{OK: true, Values: []node{"*"}},
|
||||||
StartSubREQPong: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQPong: flagNodeSlice{OK: true, Values: []node{"*"}},
|
||||||
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}},
|
||||||
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []node{"*"}},
|
||||||
StartSubREQTextToConsole: flagNodeSlice{OK: true, Values: []node{"*"}},
|
StartSubREQTextToConsole: flagNodeSlice{OK: true, Values: []node{"*"}},
|
||||||
|
StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []node{"*"}},
|
||||||
}
|
}
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
@ -180,11 +186,13 @@ func (c *Configuration) CheckFlags() error {
|
||||||
flag.Var(&c.StartSubREQErrorLog, "startSubREQErrorLog", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQErrorLog, "startSubREQErrorLog", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
flag.Var(&c.StartSubREQHello, "startSubREQHello", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQHello, "startSubREQHello", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
flag.Var(&c.StartSubREQTextToLogFile, "startSubREQTextToLogFile", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQTextToLogFile, "startSubREQTextToLogFile", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
|
flag.Var(&c.StartSubREQTextToFile, "startSubREQTextToFile", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
flag.Var(&c.StartSubREQPing, "startSubREQPing", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQPing, "startSubREQPing", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
flag.Var(&c.StartSubREQPong, "startSubREQPong", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQPong, "startSubREQPong", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
flag.Var(&c.StartSubREQCliCommand, "startSubREQCliCommand", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQCliCommand, "startSubREQCliCommand", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
flag.Var(&c.StartSubREQnCliCommand, "startSubREQnCliCommand", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQnCliCommand, "startSubREQnCliCommand", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
flag.Var(&c.StartSubREQTextToConsole, "startSubREQTextToConsole", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQTextToConsole, "startSubREQTextToConsole", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
|
flag.Var(&c.StartSubREQHttpGet, "startSubREQHttpGet", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,10 @@ SubscribersDataFolder = "./var"
|
||||||
OK = true
|
OK = true
|
||||||
Values = ["*"]
|
Values = ["*"]
|
||||||
|
|
||||||
|
[StartSubREQHttpGet]
|
||||||
|
OK = true
|
||||||
|
Values = ["*"]
|
||||||
|
|
||||||
[StartSubREQPing]
|
[StartSubREQPing]
|
||||||
OK = true
|
OK = true
|
||||||
Values = ["*"]
|
Values = ["*"]
|
||||||
|
@ -33,6 +37,10 @@ SubscribersDataFolder = "./var"
|
||||||
OK = true
|
OK = true
|
||||||
Values = ["*"]
|
Values = ["*"]
|
||||||
|
|
||||||
|
[StartSubREQTextToFile]
|
||||||
|
OK = true
|
||||||
|
Values = ["*"]
|
||||||
|
|
||||||
[StartSubREQTextToLogFile]
|
[StartSubREQTextToLogFile]
|
||||||
OK = true
|
OK = true
|
||||||
Values = ["*"]
|
Values = ["*"]
|
||||||
|
|
11
example/toShip1-REQHttpGet.json
Normal file
11
example/toShip1-REQHttpGet.json
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"label":"metrics/vg.no/annet/minside/",
|
||||||
|
"toNode": "ship1",
|
||||||
|
"data": ["http://vg.no"],
|
||||||
|
"method":"REQHttpGet",
|
||||||
|
"timeout":10,
|
||||||
|
"retries":3,
|
||||||
|
"methodTimeout": 5
|
||||||
|
}
|
||||||
|
]
|
|
@ -30,6 +30,17 @@ func (s *server) ProcessesStart() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start a subscriber for text to file messages
|
||||||
|
if s.configuration.StartSubREQTextToFile.OK {
|
||||||
|
{
|
||||||
|
fmt.Printf("Starting text to file subscriber: %#v\n", s.nodeName)
|
||||||
|
sub := newSubject(REQTextToFile, s.nodeName)
|
||||||
|
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQTextToFile.Values, nil)
|
||||||
|
// fmt.Printf("*** %#v\n", proc)
|
||||||
|
go proc.spawnWorker(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Start a subscriber for Hello messages
|
// Start a subscriber for Hello messages
|
||||||
if s.configuration.StartSubREQHello.OK {
|
if s.configuration.StartSubREQHello.OK {
|
||||||
{
|
{
|
||||||
|
@ -163,4 +174,15 @@ func (s *server) ProcessesStart() {
|
||||||
})
|
})
|
||||||
go proc.spawnWorker(s)
|
go proc.spawnWorker(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start a subscriber for Http Get Requests
|
||||||
|
if s.configuration.StartSubREQHttpGet.OK {
|
||||||
|
{
|
||||||
|
fmt.Printf("Starting Http Get subscriber: %#v\n", s.nodeName)
|
||||||
|
sub := newSubject(REQHttpGet, s.nodeName)
|
||||||
|
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubREQHttpGet.Values, nil)
|
||||||
|
// fmt.Printf("*** %#v\n", proc)
|
||||||
|
go proc.spawnWorker(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,9 @@ package steward
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -103,6 +105,8 @@ const (
|
||||||
REQPing Method = "REQPing"
|
REQPing Method = "REQPing"
|
||||||
// Will generate a reply for a ECHORequest
|
// Will generate a reply for a ECHORequest
|
||||||
REQPong Method = "REQPong"
|
REQPong Method = "REQPong"
|
||||||
|
// Http Get
|
||||||
|
REQHttpGet Method = "REQHttpGet"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The mapping of all the method constants specified, what type
|
// The mapping of all the method constants specified, what type
|
||||||
|
@ -149,6 +153,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
||||||
REQPong: methodREQPong{
|
REQPong: methodREQPong{
|
||||||
commandOrEvent: EventACK,
|
commandOrEvent: EventACK,
|
||||||
},
|
},
|
||||||
|
REQHttpGet: methodREQHttpGet{
|
||||||
|
commandOrEvent: EventACK,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,7 +379,7 @@ func (m methodREQTextToFile) handler(proc process, message Message, node string)
|
||||||
|
|
||||||
// Open file and write data.
|
// Open file and write data.
|
||||||
file := filepath.Join(folderTree, fileName)
|
file := filepath.Join(folderTree, fileName)
|
||||||
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_CREATE, 0755)
|
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: methodEventTextLogging.handler: failed to open file: %v\n", err)
|
log.Printf("error: methodEventTextLogging.handler: failed to open file: %v\n", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -651,3 +658,79 @@ func (m methodREQTextToConsole) handler(proc process, message Message, node stri
|
||||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
return ackMsg, nil
|
return ackMsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
type methodREQHttpGet struct {
|
||||||
|
commandOrEvent CommandOrEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQHttpGet) getKind() CommandOrEvent {
|
||||||
|
return m.commandOrEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
// handler to run a Http Get.
|
||||||
|
func (m methodREQHttpGet) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
log.Printf("<--- REQHttpGet received from: %v, containing: %v", message.FromNode, message.Data)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
url := message.Data[0]
|
||||||
|
|
||||||
|
client := http.Client{
|
||||||
|
Timeout: time.Second * 5,
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: NewRequest failed: %v, bailing out: %v", err, message)
|
||||||
|
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: client.Do failed: %v, bailing out: %v", err, message)
|
||||||
|
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
cancel()
|
||||||
|
er := fmt.Errorf("error: not 200, where %#v, bailing out: %v", resp.StatusCode, message)
|
||||||
|
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: ReadAll failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
outCh <- b
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
cancel()
|
||||||
|
er := fmt.Errorf("error: method timed out %v", message)
|
||||||
|
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||||
|
case out := <-outCh:
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// Prepare and queue for sending a new message with the output
|
||||||
|
// of the action executed.
|
||||||
|
newReplyMessage(proc, message, REQTextToFile, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return ackMsg, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue