From efb49b7193382491903570a44347afd33344837c Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 9 Apr 2021 18:20:04 +0200 Subject: [PATCH] Implemented arguments structure for op commands --- example/toShip2-REQOpCmdStart_REQHttpGet.json | 21 +++ example/toShip2-REQOpCmdStop_REQHttpGet.json | 22 +++ example/toShip2-REQOpPs.json | 17 +++ message_and_subject.go | 14 +- subscriber_method_types.go | 126 ++++++++++-------- 5 files changed, 145 insertions(+), 55 deletions(-) create mode 100644 example/toShip2-REQOpCmdStart_REQHttpGet.json create mode 100644 example/toShip2-REQOpCmdStop_REQHttpGet.json create mode 100644 example/toShip2-REQOpPs.json diff --git a/example/toShip2-REQOpCmdStart_REQHttpGet.json b/example/toShip2-REQOpCmdStart_REQHttpGet.json new file mode 100644 index 0000000..cd18c0d --- /dev/null +++ b/example/toShip2-REQOpCmdStart_REQHttpGet.json @@ -0,0 +1,21 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship2", + "data": [], + "method":"REQOpCommand", + "operation":{ + "opCmd":"startProc", + "opArg": { + "method": "REQHttpGet", + "allowedNodes": ["central","node1"] + } + }, + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/example/toShip2-REQOpCmdStop_REQHttpGet.json b/example/toShip2-REQOpCmdStop_REQHttpGet.json new file mode 100644 index 0000000..4417dfc --- /dev/null +++ b/example/toShip2-REQOpCmdStop_REQHttpGet.json @@ -0,0 +1,22 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship2", + "data": [], + "method":"REQOpCommand", + "operation":{ + "opCmd":"stopProc", + "opArg": { + "method": "REQHttpGet", + "kind": "subscriber", + "receivingNode": "ship2" + } + }, + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/example/toShip2-REQOpPs.json b/example/toShip2-REQOpPs.json new file mode 100644 index 0000000..939af97 --- /dev/null +++ b/example/toShip2-REQOpPs.json @@ -0,0 +1,17 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship2", + "data": [], + "method":"REQOpCommand", + "operation":{ + "opCmd":"ps" + }, + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/message_and_subject.go b/message_and_subject.go index 7956590..22060f6 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -3,6 +3,7 @@ package steward import ( "bytes" "encoding/gob" + "encoding/json" "fmt" "log" "os" @@ -39,9 +40,11 @@ type Message struct { // on a file being saved as the result of data being handled // by a method handler. FileExtension string `json:"fileExtension" yaml:"fileExtension"` + // operation are used to give an opCmd and opArg's. + Operation Operation `json:"operation"` // PreviousMessage are used for example if a reply message is // generated and we also need a copy of thedetails of the the - // initial request message + // initial request message. PreviousMessage *Message // done is used to signal when a message is fully processed. @@ -51,6 +54,15 @@ type Message struct { done chan struct{} } +// --- + +type Operation struct { + OpCmd string `json:"opCmd"` + OpArg json.RawMessage `json:"opArg"` +} + +// --- + // gobEncodePayload will encode the message structure into gob // binary format. func gobEncodeMessage(m Message) ([]byte, error) { diff --git a/subscriber_method_types.go b/subscriber_method_types.go index e022380..9254eea 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -34,6 +34,7 @@ package steward import ( "context" + "encoding/json" "fmt" "io" "log" @@ -235,6 +236,17 @@ func (m methodREQOpCommand) getKind() CommandOrEvent { return m.commandOrEvent } +type OpCmdStartProc struct { + Method Method `json:"method"` + AllowedNodes []node `json:"allowedNodes"` +} + +type OpCmdStopProc struct { + RecevingNode node `json:"receivingNode"` + Method Method `json:"method"` + Kind processKind `json:"kind"` +} + // handler to run a CLI command with timeout context. The handler will // return the output of the command run back to the calling publisher // in the ack message. @@ -242,9 +254,14 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri go func() { out := []byte{} - switch { + // unmarshal the json.RawMessage field called OpArgs. + // + // Dst interface is the generic type to Unmarshal OpArgs into, and we will + // set the type it should contain depending on the value specified in Cmd. + var dst interface{} - case message.Data[0] == "ps": + switch message.Operation.OpCmd { + case "ps": proc.processes.mu.Lock() // Loop the the processes map, and find all that is active to // be returned in the reply message. @@ -255,31 +272,52 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri } proc.processes.mu.Unlock() - case message.Data[0] == "startProc": - if len(message.Data) < 2 { + case "startProc": + // Set the interface type dst to &OpStart. + dst = &OpCmdStartProc{} + + err := json.Unmarshal(message.Operation.OpArg, &dst) + if err != nil { + log.Printf("error: outer unmarshal: %v\n", err) + } + + // Assert it into the correct non pointer value. + arg := *dst.(*OpCmdStartProc) + + //fmt.Printf(" ** Content of Arg = %#v\n", arg) + + if len(arg.AllowedNodes) == 0 { er := fmt.Errorf("error: startProc: no allowed publisher nodes specified: %v" + fmt.Sprint(message)) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) return } - newMethod := Method(message.Data[1]) - - // We need to convert the []string to []node - aps := message.Data[2:] - var allowedPublishers []node - for _, v := range aps { - allowedPublishers = append(allowedPublishers, node(v)) + if arg.Method == "" { + er := fmt.Errorf("error: startProc: no method specified: %v" + fmt.Sprint(message)) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return } - sub := newSubject(newMethod, proc.configuration.NodeName) - procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, allowedPublishers, nil) + // Create the process and start it. + sub := newSubject(arg.Method, proc.configuration.NodeName) + procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, arg.AllowedNodes, nil) go procNew.spawnWorker(proc.processes, proc.natsConn) er := fmt.Errorf("info: startProc: started %v on %v", sub, message.ToNode) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - case message.Data[0] == "stopProc": - fmt.Printf(" ** DEBUG 0: got stopProc\n") + case "stopProc": + // Set the interface type dst to &OpStart. + dst = &OpCmdStopProc{} + + err := json.Unmarshal(message.Operation.OpArg, &dst) + if err != nil { + log.Printf("error: outer unmarshal: %v\n", err) + } + + // Assert it into the correct non pointer value. + arg := *dst.(*OpCmdStopProc) + // Data layout: OPCommand, Method, publisher/subscriber, receivingNode // // The processes can be either publishers or subscribers. The subject name @@ -289,42 +327,20 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri // publisher process will have the name of the node to receive the message, // and not just the local node name as with subscriber processes. // receive the message we need to specify - if len(message.Data) < 4 { - fmt.Printf(`error: DEBUG: stopProc: not enough data values. want "", "", "","": %v` + fmt.Sprint(message)) - er := fmt.Errorf(`error: stopProc: not enough data values. want "", "", "": %v` + fmt.Sprint(message)) - sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - return - } + // Process name example: ship2.REQTextToLogFile.EventACK_subscriber - // func (s Subject) name() subjectName { - // return subjectName(fmt.Sprintf("%s.%s.%s", s.ToNode, s.Method, s.CommandOrEvent))'' - // - // pn = processNameGet(p.subject.name(), processKindSubscriber) + sub := newSubject(arg.Method, string(arg.RecevingNode)) + processName := processNameGet(sub.name(), arg.Kind) + // fmt.Printf(" ** DEBUG1: processName: %v\n", processName) - toStopMethod := Method(message.Data[1]) - pubOrSub := processKind(message.Data[2]) - recevingNode := processKind(message.Data[3]) - // ..check if valid - - sub := newSubject(toStopMethod, string(recevingNode)) - processName := processNameGet(sub.name(), pubOrSub) - // ..check if valid - fmt.Printf(" ** DEBUG1: processName: %v\n", processName) - - fmt.Printf(" ** DEBUG1.1: Before mutex lock\n") proc.processes.mu.Lock() - fmt.Printf(" ** DEBUG1.2: After mutex lock\n") - for k, v := range proc.processes.active { - fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind) - } + // for k, v := range proc.processes.active { + // fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind) + // } toStopProc, ok := proc.processes.active[processName] - fmt.Printf(" ** DEBUG2.1: toStopProc: %v\n", toStopProc) if ok { - fmt.Printf(" ** DEBUG2.2: toStopProc: %v\n", toStopProc) - fmt.Printf(" ** STOP: processName: %v\n", processName) - fmt.Printf(" ** STOP: toStopProc: %v\n", toStopProc) // Delete the process from the processes map delete(proc.processes.active, processName) // Stop started go routines that belong to the process. @@ -334,21 +350,23 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri if err != nil { log.Printf(" ** Error: failed to stop *nats.Subscription: %v\n", err) } + + er := fmt.Errorf("info: stopProc: stoped %v on %v", sub, message.ToNode) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + + newReplyMessage(proc, message, REQTextToLogFile, []byte(er.Error())) + + } else { + er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + + newReplyMessage(proc, message, REQTextToLogFile, []byte(er.Error())) } + proc.processes.mu.Unlock() - er := fmt.Errorf("info: stopProc: stoped %v on %v", sub, message.ToNode) - sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - - default: - fmt.Printf("error: no such OpCommand specified: " + message.Data[0]) - er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0]) - sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - return } - // Prepare and queue for sending a new message with the output - // of the action executed. newReplyMessage(proc, message, REQTextToLogFile, out) }()