mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
Implemented arguments structure for op commands
This commit is contained in:
parent
f1359e45d9
commit
efb49b7193
5 changed files with 145 additions and 55 deletions
21
example/toShip2-REQOpCmdStart_REQHttpGet.json
Normal file
21
example/toShip2-REQOpCmdStart_REQHttpGet.json
Normal file
|
@ -0,0 +1,21 @@
|
|||
[
|
||||
{
|
||||
"directory":"opcommand_logs",
|
||||
"fileExtension": ".log",
|
||||
"toNode": "ship2",
|
||||
"data": [],
|
||||
"method":"REQOpCommand",
|
||||
"operation":{
|
||||
"opCmd":"startProc",
|
||||
"opArg": {
|
||||
"method": "REQHttpGet",
|
||||
"allowedNodes": ["central","node1"]
|
||||
}
|
||||
},
|
||||
"timeout":3,
|
||||
"retries":3,
|
||||
"requestTimeout":3,
|
||||
"requestRetries":3,
|
||||
"MethodTimeout": 7
|
||||
}
|
||||
]
|
22
example/toShip2-REQOpCmdStop_REQHttpGet.json
Normal file
22
example/toShip2-REQOpCmdStop_REQHttpGet.json
Normal file
|
@ -0,0 +1,22 @@
|
|||
[
|
||||
{
|
||||
"directory":"opcommand_logs",
|
||||
"fileExtension": ".log",
|
||||
"toNode": "ship2",
|
||||
"data": [],
|
||||
"method":"REQOpCommand",
|
||||
"operation":{
|
||||
"opCmd":"stopProc",
|
||||
"opArg": {
|
||||
"method": "REQHttpGet",
|
||||
"kind": "subscriber",
|
||||
"receivingNode": "ship2"
|
||||
}
|
||||
},
|
||||
"timeout":3,
|
||||
"retries":3,
|
||||
"requestTimeout":3,
|
||||
"requestRetries":3,
|
||||
"MethodTimeout": 7
|
||||
}
|
||||
]
|
17
example/toShip2-REQOpPs.json
Normal file
17
example/toShip2-REQOpPs.json
Normal file
|
@ -0,0 +1,17 @@
|
|||
[
|
||||
{
|
||||
"directory":"opcommand_logs",
|
||||
"fileExtension": ".log",
|
||||
"toNode": "ship2",
|
||||
"data": [],
|
||||
"method":"REQOpCommand",
|
||||
"operation":{
|
||||
"opCmd":"ps"
|
||||
},
|
||||
"timeout":3,
|
||||
"retries":3,
|
||||
"requestTimeout":3,
|
||||
"requestRetries":3,
|
||||
"MethodTimeout": 7
|
||||
}
|
||||
]
|
|
@ -3,6 +3,7 @@ package steward
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
@ -39,9 +40,11 @@ type Message struct {
|
|||
// on a file being saved as the result of data being handled
|
||||
// by a method handler.
|
||||
FileExtension string `json:"fileExtension" yaml:"fileExtension"`
|
||||
// operation are used to give an opCmd and opArg's.
|
||||
Operation Operation `json:"operation"`
|
||||
// PreviousMessage are used for example if a reply message is
|
||||
// generated and we also need a copy of thedetails of the the
|
||||
// initial request message
|
||||
// initial request message.
|
||||
PreviousMessage *Message
|
||||
|
||||
// done is used to signal when a message is fully processed.
|
||||
|
@ -51,6 +54,15 @@ type Message struct {
|
|||
done chan struct{}
|
||||
}
|
||||
|
||||
// ---
|
||||
|
||||
type Operation struct {
|
||||
OpCmd string `json:"opCmd"`
|
||||
OpArg json.RawMessage `json:"opArg"`
|
||||
}
|
||||
|
||||
// ---
|
||||
|
||||
// gobEncodePayload will encode the message structure into gob
|
||||
// binary format.
|
||||
func gobEncodeMessage(m Message) ([]byte, error) {
|
||||
|
|
|
@ -34,6 +34,7 @@ package steward
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -235,6 +236,17 @@ func (m methodREQOpCommand) getKind() CommandOrEvent {
|
|||
return m.commandOrEvent
|
||||
}
|
||||
|
||||
type OpCmdStartProc struct {
|
||||
Method Method `json:"method"`
|
||||
AllowedNodes []node `json:"allowedNodes"`
|
||||
}
|
||||
|
||||
type OpCmdStopProc struct {
|
||||
RecevingNode node `json:"receivingNode"`
|
||||
Method Method `json:"method"`
|
||||
Kind processKind `json:"kind"`
|
||||
}
|
||||
|
||||
// handler to run a CLI command with timeout context. The handler will
|
||||
// return the output of the command run back to the calling publisher
|
||||
// in the ack message.
|
||||
|
@ -242,9 +254,14 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
|||
go func() {
|
||||
out := []byte{}
|
||||
|
||||
switch {
|
||||
// unmarshal the json.RawMessage field called OpArgs.
|
||||
//
|
||||
// Dst interface is the generic type to Unmarshal OpArgs into, and we will
|
||||
// set the type it should contain depending on the value specified in Cmd.
|
||||
var dst interface{}
|
||||
|
||||
case message.Data[0] == "ps":
|
||||
switch message.Operation.OpCmd {
|
||||
case "ps":
|
||||
proc.processes.mu.Lock()
|
||||
// Loop the the processes map, and find all that is active to
|
||||
// be returned in the reply message.
|
||||
|
@ -255,31 +272,52 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
|||
}
|
||||
proc.processes.mu.Unlock()
|
||||
|
||||
case message.Data[0] == "startProc":
|
||||
if len(message.Data) < 2 {
|
||||
case "startProc":
|
||||
// Set the interface type dst to &OpStart.
|
||||
dst = &OpCmdStartProc{}
|
||||
|
||||
err := json.Unmarshal(message.Operation.OpArg, &dst)
|
||||
if err != nil {
|
||||
log.Printf("error: outer unmarshal: %v\n", err)
|
||||
}
|
||||
|
||||
// Assert it into the correct non pointer value.
|
||||
arg := *dst.(*OpCmdStartProc)
|
||||
|
||||
//fmt.Printf(" ** Content of Arg = %#v\n", arg)
|
||||
|
||||
if len(arg.AllowedNodes) == 0 {
|
||||
er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message))
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
return
|
||||
}
|
||||
|
||||
newMethod := Method(message.Data[1])
|
||||
|
||||
// We need to convert the []string to []node
|
||||
aps := message.Data[2:]
|
||||
var allowedPublishers []node
|
||||
for _, v := range aps {
|
||||
allowedPublishers = append(allowedPublishers, node(v))
|
||||
if arg.Method == "" {
|
||||
er := fmt.Errorf("error: startProc: no method specified: %v" + fmt.Sprint(message))
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
return
|
||||
}
|
||||
|
||||
sub := newSubject(newMethod, proc.configuration.NodeName)
|
||||
procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, allowedPublishers, nil)
|
||||
// Create the process and start it.
|
||||
sub := newSubject(arg.Method, proc.configuration.NodeName)
|
||||
procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, arg.AllowedNodes, nil)
|
||||
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
||||
|
||||
er := fmt.Errorf("info: startProc: started %v on %v", sub, message.ToNode)
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
|
||||
case message.Data[0] == "stopProc":
|
||||
fmt.Printf(" ** DEBUG 0: got stopProc\n")
|
||||
case "stopProc":
|
||||
// Set the interface type dst to &OpStart.
|
||||
dst = &OpCmdStopProc{}
|
||||
|
||||
err := json.Unmarshal(message.Operation.OpArg, &dst)
|
||||
if err != nil {
|
||||
log.Printf("error: outer unmarshal: %v\n", err)
|
||||
}
|
||||
|
||||
// Assert it into the correct non pointer value.
|
||||
arg := *dst.(*OpCmdStopProc)
|
||||
|
||||
// Data layout: OPCommand, Method, publisher/subscriber, receivingNode
|
||||
//
|
||||
// The processes can be either publishers or subscribers. The subject name
|
||||
|
@ -289,42 +327,20 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
|||
// publisher process will have the name of the node to receive the message,
|
||||
// and not just the local node name as with subscriber processes.
|
||||
// receive the message we need to specify
|
||||
if len(message.Data) < 4 {
|
||||
fmt.Printf(`error: DEBUG: stopProc: not enough data values. want "<OPCommand>", "<Method>", "<publisher/subscriber>","<receiving nodeName>": %v` + fmt.Sprint(message))
|
||||
er := fmt.Errorf(`error: stopProc: not enough data values. want "<OPCommand>", "<Method>", "<publisher/subscriber>": %v` + fmt.Sprint(message))
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
return
|
||||
}
|
||||
// Process name example: ship2.REQTextToLogFile.EventACK_subscriber
|
||||
|
||||
// func (s Subject) name() subjectName {
|
||||
// return subjectName(fmt.Sprintf("%s.%s.%s", s.ToNode, s.Method, s.CommandOrEvent))''
|
||||
//
|
||||
// pn = processNameGet(p.subject.name(), processKindSubscriber)
|
||||
sub := newSubject(arg.Method, string(arg.RecevingNode))
|
||||
processName := processNameGet(sub.name(), arg.Kind)
|
||||
// fmt.Printf(" ** DEBUG1: processName: %v\n", processName)
|
||||
|
||||
toStopMethod := Method(message.Data[1])
|
||||
pubOrSub := processKind(message.Data[2])
|
||||
recevingNode := processKind(message.Data[3])
|
||||
// ..check if valid
|
||||
|
||||
sub := newSubject(toStopMethod, string(recevingNode))
|
||||
processName := processNameGet(sub.name(), pubOrSub)
|
||||
// ..check if valid
|
||||
fmt.Printf(" ** DEBUG1: processName: %v\n", processName)
|
||||
|
||||
fmt.Printf(" ** DEBUG1.1: Before mutex lock\n")
|
||||
proc.processes.mu.Lock()
|
||||
fmt.Printf(" ** DEBUG1.2: After mutex lock\n")
|
||||
|
||||
for k, v := range proc.processes.active {
|
||||
fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind)
|
||||
}
|
||||
// for k, v := range proc.processes.active {
|
||||
// fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind)
|
||||
// }
|
||||
|
||||
toStopProc, ok := proc.processes.active[processName]
|
||||
fmt.Printf(" ** DEBUG2.1: toStopProc: %v\n", toStopProc)
|
||||
if ok {
|
||||
fmt.Printf(" ** DEBUG2.2: toStopProc: %v\n", toStopProc)
|
||||
fmt.Printf(" ** STOP: processName: %v\n", processName)
|
||||
fmt.Printf(" ** STOP: toStopProc: %v\n", toStopProc)
|
||||
// Delete the process from the processes map
|
||||
delete(proc.processes.active, processName)
|
||||
// Stop started go routines that belong to the process.
|
||||
|
@ -334,21 +350,23 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
|
|||
if err != nil {
|
||||
log.Printf(" ** Error: failed to stop *nats.Subscription: %v\n", err)
|
||||
}
|
||||
|
||||
er := fmt.Errorf("info: stopProc: stoped %v on %v", sub, message.ToNode)
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
|
||||
newReplyMessage(proc, message, REQTextToLogFile, []byte(er.Error()))
|
||||
|
||||
} else {
|
||||
er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode)
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
|
||||
newReplyMessage(proc, message, REQTextToLogFile, []byte(er.Error()))
|
||||
}
|
||||
|
||||
proc.processes.mu.Unlock()
|
||||
|
||||
er := fmt.Errorf("info: stopProc: stoped %v on %v", sub, message.ToNode)
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
|
||||
default:
|
||||
fmt.Printf("error: no such OpCommand specified: " + message.Data[0])
|
||||
er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0])
|
||||
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
return
|
||||
}
|
||||
|
||||
// Prepare and queue for sending a new message with the output
|
||||
// of the action executed.
|
||||
newReplyMessage(proc, message, REQTextToLogFile, out)
|
||||
}()
|
||||
|
||||
|
|
Loading…
Reference in a new issue