mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
request and reply methods for opcommand
This commit is contained in:
parent
6c3ac318dc
commit
66f89cd731
5 changed files with 113 additions and 3 deletions
|
@ -38,8 +38,8 @@ SubscribersDataFolder = "./data"
|
|||
Values = ["*"]
|
||||
|
||||
[StartSubOpCommand]
|
||||
OK = false
|
||||
Values = []
|
||||
OK = true
|
||||
Values = ["*"]
|
||||
|
||||
[StartSubSayHello]
|
||||
OK = true
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[
|
||||
{
|
||||
|
||||
"toNode": "ship1",
|
||||
"toNode": "central",
|
||||
"data": ["ps"],
|
||||
"method":"OpCommand",
|
||||
"timeout":3,
|
11
example/OpCommandRequest.json
Normal file
11
example/OpCommandRequest.json
Normal file
|
@ -0,0 +1,11 @@
|
|||
[
|
||||
{
|
||||
|
||||
"toNode": "ship1",
|
||||
"data": ["psa"],
|
||||
"method":"OpCommandRequest",
|
||||
"timeout":3,
|
||||
"retries":3,
|
||||
"MethodTimeout": 7
|
||||
}
|
||||
]
|
|
@ -22,6 +22,20 @@ func (s *server) ProcessesStart() {
|
|||
}
|
||||
}
|
||||
|
||||
{
|
||||
fmt.Printf("Starting OpCommandRequest subscriber: %#v\n", s.nodeName)
|
||||
sub := newSubject(OpCommandRequest, CommandACK, s.nodeName)
|
||||
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
|
||||
go proc.spawnWorker(s)
|
||||
}
|
||||
|
||||
{
|
||||
fmt.Printf("Starting OpCommandReply subscriber: %#v\n", s.nodeName)
|
||||
sub := newSubject(OpCommandReply, CommandACK, s.nodeName)
|
||||
proc := newProcess(s.processes, s.toRingbufferCh, s.configuration, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}, nil)
|
||||
go proc.spawnWorker(s)
|
||||
}
|
||||
|
||||
// Start a subscriber for CLICommand messages
|
||||
if s.configuration.StartSubCLICommand.OK {
|
||||
{
|
||||
|
|
|
@ -52,6 +52,10 @@ type Method string
|
|||
const (
|
||||
// Command for client operation of the system
|
||||
OpCommand Method = "OpCommand"
|
||||
// Command for client operation request of the system
|
||||
OpCommandRequest Method = "OpCommandRequest"
|
||||
// Command for client operation reply from a node
|
||||
OpCommandReply Method = "OpCommandReply"
|
||||
// Execute a CLI command in for example bash or cmd.
|
||||
// This is a command type, so the output of the command executed
|
||||
// will directly showed in the ACK message received.
|
||||
|
@ -119,6 +123,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
|||
OpCommand: methodOpCommand{
|
||||
commandOrEvent: CommandACK,
|
||||
},
|
||||
OpCommandRequest: methodOpCommandRequest{
|
||||
commandOrEvent: CommandACK,
|
||||
},
|
||||
OpCommandReply: methodOpCommandReply{
|
||||
commandOrEvent: CommandACK,
|
||||
},
|
||||
CLICommand: methodCLICommand{
|
||||
commandOrEvent: CommandACK,
|
||||
},
|
||||
|
@ -229,6 +239,81 @@ func (m methodOpCommand) handler(proc process, message Message, node string) ([]
|
|||
|
||||
// -----
|
||||
|
||||
type methodOpCommandRequest struct {
|
||||
commandOrEvent CommandOrEvent
|
||||
}
|
||||
|
||||
func (m methodOpCommandRequest) getKind() CommandOrEvent {
|
||||
return m.commandOrEvent
|
||||
}
|
||||
|
||||
// handler to run a CLI command with timeout context. The handler will
|
||||
// return the output of the command run back to the calling publisher
|
||||
// in the ack message.
|
||||
func (m methodOpCommandRequest) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
go func() {
|
||||
out := []byte{}
|
||||
|
||||
switch {
|
||||
case message.Data[0] == "ps":
|
||||
proc.processes.mu.Lock()
|
||||
for _, v := range proc.processes.active {
|
||||
s := fmt.Sprintf("* proc - : %v, id: %v, name: %v, allowed from: %s\n", v.processKind, v.processID, v.subject.name(), v.allowedReceivers)
|
||||
sb := []byte(s)
|
||||
out = append(out, sb...)
|
||||
}
|
||||
proc.processes.mu.Unlock()
|
||||
|
||||
default:
|
||||
er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0])
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
return
|
||||
}
|
||||
|
||||
//--
|
||||
// Create a new message for the reply, and put it on the
|
||||
// ringbuffer to be published.
|
||||
newMsg := Message{
|
||||
ToNode: message.FromNode,
|
||||
Data: []string{string(out)},
|
||||
Method: OpCommandReply,
|
||||
Timeout: 3,
|
||||
Retries: 3,
|
||||
}
|
||||
fmt.Printf("** %#v\n", newMsg)
|
||||
|
||||
nSAM, err := newSAM(newMsg)
|
||||
if err != nil {
|
||||
// In theory the system should drop the message before it reaches here.
|
||||
log.Printf("error: methodCLICommandRequest: %v\n", err)
|
||||
}
|
||||
proc.toRingbufferCh <- []subjectAndMessage{nSAM}
|
||||
//--
|
||||
}()
|
||||
|
||||
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", node, message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
||||
// -----
|
||||
|
||||
type methodOpCommandReply struct {
|
||||
commandOrEvent CommandOrEvent
|
||||
}
|
||||
|
||||
func (m methodOpCommandReply) getKind() CommandOrEvent {
|
||||
return m.commandOrEvent
|
||||
}
|
||||
|
||||
func (m methodOpCommandReply) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
log.Printf("<--- OpCommand Reply received from: %v, containing: %v", message.FromNode, message.Data)
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
||||
// ---
|
||||
|
||||
type methodCLICommand struct {
|
||||
commandOrEvent CommandOrEvent
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue