1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

removed methods with result in ack message

This commit is contained in:
postmannen 2021-04-04 07:33:18 +02:00
parent e4303bb77f
commit 9a5e37687f
14 changed files with 4 additions and 403 deletions

View file

@ -101,8 +101,6 @@ type Configuration struct {
StartSubErrorLog flagNodeSlice
// Subscriber for hello messages
StartSubSayHello flagNodeSlice
// Subscriber for CLI Commands
StartSubCLICommand flagNodeSlice
// Subscriber for text logging
StartSubTextLogging flagNodeSlice
// Subscriber for Echo Request
@ -115,8 +113,6 @@ type Configuration struct {
StartSubCLICommandRequestNOSEQ flagNodeSlice
// Subscriber for CLICommandReply
StartSubCLICommandReply flagNodeSlice
// Subscriber for OpCommand
StartSubOpCommand flagNodeSlice
}
func NewConfiguration() *Configuration {
@ -138,14 +134,12 @@ func newConfigurationDefaults() Configuration {
CentralNodeName: "",
StartSubErrorLog: flagNodeSlice{Values: []node{}},
StartSubSayHello: flagNodeSlice{OK: true, Values: []node{"*"}},
StartSubCLICommand: flagNodeSlice{OK: true, Values: []node{"*"}},
StartSubTextLogging: flagNodeSlice{OK: true, Values: []node{"*"}},
StartSubEchoRequest: flagNodeSlice{OK: true, Values: []node{"*"}},
StartSubEchoReply: flagNodeSlice{OK: true, Values: []node{"*"}},
StartSubCLICommandRequest: flagNodeSlice{OK: true, Values: []node{"*"}},
StartSubCLICommandRequestNOSEQ: flagNodeSlice{OK: true, Values: []node{"*"}},
StartSubCLICommandReply: flagNodeSlice{OK: true, Values: []node{"*"}},
StartSubOpCommand: flagNodeSlice{OK: true, Values: []node{"*"}},
}
return c
}
@ -179,14 +173,12 @@ func (c *Configuration) CheckFlags() error {
flag.Var(&c.StartSubErrorLog, "startSubErrorLog", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Var(&c.StartSubSayHello, "startSubSayHello", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Var(&c.StartSubCLICommand, "startSubCLICommand", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Var(&c.StartSubTextLogging, "startSubTextLogging", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Var(&c.StartSubEchoRequest, "startSubEchoRequest", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Var(&c.StartSubEchoReply, "startSubEchoReply", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Var(&c.StartSubCLICommandRequest, "startSubCLICommandRequest", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Var(&c.StartSubCLICommandRequestNOSEQ, "startSubCLICommandRequestNOSEQ", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Var(&c.StartSubCLICommandReply, "startSubCLICommandReply", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Var(&c.StartSubOpCommand, "startSubOpCommand", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
flag.Parse()

View file

@ -9,10 +9,6 @@ PromHostAndPort = ":2112"
StartPubSayHello = 0
SubscribersDataFolder = "./data"
[StartSubCLICommand]
OK = true
Values = ["*"]
[StartSubCLICommandReply]
OK = true
Values = ["*"]
@ -37,10 +33,6 @@ SubscribersDataFolder = "./data"
OK = true
Values = ["*"]
[StartSubOpCommand]
OK = true
Values = ["*"]
[StartSubSayHello]
OK = true
Values = ["*"]

View file

@ -1,11 +0,0 @@
[
{
"toNode": "central",
"data": ["ps"],
"method":"OpCommand",
"timeout":3,
"retries":3,
"methodTimeout": 7
}
]

View file

@ -1,10 +0,0 @@
[
{
"toNode": "ship1",
"data": ["bash","-c","curl http://localhost:8888/metrics"],
"method":"CLICommand",
"timeout":3,
"retries":3
}
]

View file

@ -1,18 +0,0 @@
[
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":5,
"retries":10
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":5,
"retries":10
}
]

View file

@ -1,100 +0,0 @@
[
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
}
]

View file

@ -1,11 +0,0 @@
[
{
"toNode": "ship1",
"data": ["bash","-c","sleep 5 && netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3,
"MethodTimeout": 7
}
]

View file

@ -1,100 +0,0 @@
[
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
},
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
}
]

View file

@ -1,10 +0,0 @@
[
{
"toNode": "ship2",
"data": ["bash","-c","netstat -an|grep -i listen"],
"method":"CLICommand",
"timeout":3,
"retries":3
}
]

View file

@ -1,8 +0,0 @@
[
{
"toNode": "ship2",
"data": ["bash","-c","tree ../"],
"method":"CLICommand"
}
]

View file

@ -183,7 +183,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
msg := &nats.Msg{
Subject: string(p.subject.name()),
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommand"),
// Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommandRequest"),
// Structure of the reply message are:
// reply.<nodename>.<message type>.<method>
Reply: fmt.Sprintf("reply.%s", p.subject.name()),

View file

@ -12,16 +12,6 @@ func (s *server) ProcessesStart() {
// --- Subscriber services that can be started via flags
// Start a subscriber for OPCommand messages
if s.configuration.StartSubOpCommand.OK {
{
fmt.Printf("Starting OpCommand subscriber: %#v\n", s.nodeName)
sub := newSubject(OpCommand, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubOpCommand.Values, nil)
go proc.spawnWorker(s)
}
}
{
fmt.Printf("Starting OpCommandRequest subscriber: %#v\n", s.nodeName)
sub := newSubject(OpCommandRequest, s.nodeName)
@ -29,17 +19,6 @@ func (s *server) ProcessesStart() {
go proc.spawnWorker(s)
}
// Start a subscriber for CLICommand messages
if s.configuration.StartSubCLICommand.OK {
{
fmt.Printf("Starting CLICommand subscriber: %#v\n", s.nodeName)
sub := newSubject(CLICommand, s.nodeName)
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, s.configuration.StartSubCLICommand.Values, nil)
// fmt.Printf("*** %#v\n", proc)
go proc.spawnWorker(s)
}
}
// Start a subscriber for textLogging messages
if s.configuration.StartSubTextLogging.OK {
{

View file

@ -7,15 +7,15 @@
// Overall structure example shown below.
//
// ---
// type methodCommandCLICommand struct {
// type methodCommandCLICommandRequest struct {
// commandOrEvent CommandOrEvent
// }
//
// func (m methodCommandCLICommand) getKind() CommandOrEvent {
// func (m methodCommandCLICommandRequest) getKind() CommandOrEvent {
// return m.commandOrEvent
// }
//
// func (m methodCommandCLICommand) handler(s *server, message Message, node string) ([]byte, error) {
// func (m methodCommandCLICommandRequest) handler(s *server, message Message, node string) ([]byte, error) {
// ...
// ...
// ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
@ -50,17 +50,9 @@ type Method string
// The constants that will be used throughout the system for
// when specifying what kind of Method to send or work with.
const (
// Command for client operation of the system
OpCommand Method = "OpCommand"
// Command for client operation request of the system
OpCommandRequest Method = "OpCommandRequest"
// Execute a CLI command in for example bash or cmd.
// This is a command type, so the output of the command executed
// will directly showed in the ACK message received.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
CLICommand Method = "CLICommand"
// Execute a CLI command in for example bash or cmd.
// This is an event type, where a message will be sent to a
// node with the command to execute and an ACK will be replied
// if it was delivered succesfully. The output of the command
@ -121,15 +113,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
// Event, Used to communicate that an action has been performed.
ma := MethodsAvailable{
methodhandlers: map[Method]methodHandler{
OpCommand: methodOpCommand{
commandOrEvent: CommandACK,
},
OpCommandRequest: methodOpCommandRequest{
commandOrEvent: CommandACK,
},
CLICommand: methodCLICommand{
commandOrEvent: CommandACK,
},
CLICommandRequest: methodCLICommandRequest{
commandOrEvent: CommandACK,
},
@ -200,41 +186,6 @@ type methodHandler interface {
getKind() CommandOrEvent
}
// -----
type methodOpCommand struct {
commandOrEvent CommandOrEvent
}
func (m methodOpCommand) getKind() CommandOrEvent {
return m.commandOrEvent
}
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// in the ack message.
func (m methodOpCommand) handler(proc process, message Message, node string) ([]byte, error) {
out := []byte{}
switch {
case message.Data[0] == "ps":
proc.processes.mu.Lock()
for _, v := range proc.processes.active {
s := fmt.Sprintf("* proc - : %v, id: %v, name: %v, allowed from: %s\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers)
sb := []byte(s)
out = append(out, sb...)
}
proc.processes.mu.Unlock()
default:
out = []byte("error: no such OpCommand specified: " + message.Data[0])
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v, error: %s\n---\n", node, message.ID, out))
return ackMsg, nil
}
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s", node, message.ID, out))
return ackMsg, nil
}
// -----
type methodOpCommandRequest struct {
@ -309,51 +260,6 @@ func newReplyMessage(proc process, message Message, method Method, outData []byt
//--
}
// -----
type methodCLICommand struct {
commandOrEvent CommandOrEvent
}
func (m methodCLICommand) getKind() CommandOrEvent {
return m.commandOrEvent
}
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// in the ack message.
func (m methodCLICommand) handler(proc process, message Message, node string) ([]byte, error) {
out := []byte{}
c := message.Data[0]
a := message.Data[1:]
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
outCh := make(chan []byte)
go func() {
cmd := exec.CommandContext(ctx, c, a...)
out, err := cmd.Output()
if err != nil {
log.Printf("error: %v\n", err)
}
outCh <- out
}()
select {
case <-ctx.Done():
cancel()
er := fmt.Errorf("error: method timed out %v", message)
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
case out = <-outCh:
cancel()
}
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
return ackMsg, nil
}
type methodTextLogging struct {
commandOrEvent CommandOrEvent
}