1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

removed REQOpCmd

This commit is contained in:
postmannen 2021-11-16 20:13:03 +01:00
parent c70b51e1fb
commit f8491df388
5 changed files with 2 additions and 439 deletions

View file

@ -34,7 +34,7 @@ The idea behind Steward is to help out with exactly these issues, allowing you t
- [REQHello](#reqhello)
- [REQErrorLog](#reqerrorlog)
- [Request Methods used for reply messages](#request-methods-used-for-reply-messages)
- [REQToConsole](#reqtoconsole-1)
- [REQToConsole for printing to STDOUT](#reqtoconsole-for-printing-to-stdout)
- [REQToFileAppend](#reqtofileappend)
- [REQToFile](#reqtofile)
- [ReqCliCommand](#reqclicommand-1)
@ -58,9 +58,6 @@ The idea behind Steward is to help out with exactly these issues, allowing you t
- [Example JSON for appending a message of type command into the `socket` file](#example-json-for-appending-a-message-of-type-command-into-the-socket-file)
- [Specify more messages at once do](#specify-more-messages-at-once-do)
- [Send the same message to several hosts by using the toHosts field](#send-the-same-message-to-several-hosts-by-using-the-tohosts-field)
- [Send an Op Command message for process listing with custom timeout and amount of retries](#send-an-op-command-message-for-process-listing-with-custom-timeout-and-amount-of-retries)
- [Send and Op Command to stop a subscriber on a node](#send-and-op-command-to-stop-a-subscriber-on-a-node)
- [Send and Op Command to start a subscriber on a node](#send-and-op-command-to-start-a-subscriber-on-a-node)
- [Tail a log file on a node, and save the result of the tail centrally at the directory specified](#tail-a-log-file-on-a-node-and-save-the-result-of-the-tail-centrally-at-the-directory-specified)
- [Concepts/Ideas](#conceptsideas)
- [Naming](#naming)
@ -496,7 +493,7 @@ This is **not** to be used by users. Use **REQToFileAppend** instead.
### Request Methods used for reply messages
#### REQToConsole
#### REQToConsole for printing to STDOUT
Print the output of the reply message to the STDOUT where the receiving steward instance are running.
@ -863,8 +860,6 @@ directory
// on a file being saved as the result of data being handled
// by a method handler.
fileName
// operation are used to give an opCmd and opArg's.
operation
```
### How to send a Message
@ -943,80 +938,6 @@ The API for sending a message from one node to another node is by sending a stru
]
```
##### Send an Op Command message for process listing with custom timeout and amount of retries
```json
[
{
"directory":"opcommand_logs",
"fileName": "some.log",
"toNode": "ship2",
"data": [],
"method":"REQOpCommand",
"operation":{
"opCmd":"ps"
},
"ACKTimeout":3,
"retries":3,
"replyACKTimeout":3,
"replyRetries":3,
"methodTimeout": 7
}
]
```
##### Send and Op Command to stop a subscriber on a node
```json
[
{
"directory":"opcommand_logs",
"fileName": "some.log",
"toNode": "ship2",
"data": [],
"method":"REQOpCommand",
"operation":{
"opCmd":"stopProc",
"opArg": {
"method": "REQHttpGet",
"kind": "subscriber",
"receivingNode": "ship2"
}
},
"ACKTimeout":3,
"retries":3,
"replyACKTimeout":3,
"replyRetries":3,
"methodTimeout": 7
}
]
```
##### Send and Op Command to start a subscriber on a node
```json
[
{
"directory":"opcommand_logs",
"fileName": "some.log",
"toNode": "ship2",
"data": [],
"method":"REQOpCommand",
"operation":{
"opCmd":"startProc",
"opArg": {
"method": "REQHttpGet"
}
},
"ACKTimeout":3,
"retries":3,
"replyACKTimeout":3,
"replyRetries":3,
"methodTimeout": 7
}
]
```
##### Tail a log file on a node, and save the result of the tail centrally at the directory specified
```json

View file

@ -3,7 +3,6 @@ package steward
import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"log"
"os"
@ -70,8 +69,6 @@ type Message struct {
// on a file being saved as the result of data being handled
// by a method handler.
FileName string `json:"fileName" yaml:"fileName"`
// operation are used to give an opCmd and opArg's.
Operation Operation `json:"operation"`
// PreviousMessage are used for example if a reply message is
// generated and we also need a copy of the details of the the
// initial request message.
@ -99,14 +96,6 @@ type Message struct {
// ---
// operation are used to specify opCmd and opArg's.
type Operation struct {
OpCmd string `json:"opCmd"`
OpArg json.RawMessage `json:"opArg"`
}
// ---
// gobEncodePayload will encode the message structure into gob
// binary format before putting it into a nats message.
func gobEncodeMessage(m Message) ([]byte, error) {

View file

@ -76,14 +76,6 @@ func (p *processes) Start(proc process) {
// --- Subscriber services that can be started via flags
// Allways start the listeners for Op commands
{
log.Printf("Starting REQOpCommand subscriber: %#v\n", proc.node)
sub := newSubject(REQOpCommand, string(proc.node))
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(proc.processes, proc.natsConn)
}
{
log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node)
sub := newSubject(REQOpProcessList, string(proc.node))

View file

@ -36,7 +36,6 @@ import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
@ -61,10 +60,6 @@ type Method string
const (
// Initial parent method used to start other processes.
REQInitial Method = "REQInitial"
// Command for client operation request of the system. The op
// command to execute shall be given in the data field of the
// message as string value. For example "ps".
REQOpCommand Method = "REQOpCommand"
// Get a list of all the running processes.
REQOpProcessList Method = "REQOpProcessList"
// Start up a process.
@ -140,9 +135,6 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
REQInitial: methodREQInitial{
commandOrEvent: CommandACK,
},
REQOpCommand: methodREQOpCommand{
commandOrEvent: CommandACK,
},
REQOpProcessList: methodREQOpProcessList{
commandOrEvent: CommandACK,
},
@ -339,172 +331,6 @@ type methodHandler interface {
// -----
type methodREQOpCommand struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpCommand) getKind() CommandOrEvent {
return m.commandOrEvent
}
type OpCmdStartProc struct {
Method Method `json:"method"`
}
type OpCmdStopProc struct {
RecevingNode Node `json:"receivingNode"`
Method Method `json:"method"`
Kind processKind `json:"kind"`
ID int `json:"id"`
}
// handler to run a CLI command with timeout context. The handler will
// return the output of the command run back to the calling publisher
// in the ack message.
func (m methodREQOpCommand) handler(proc process, message Message, nodeName string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
out := []byte{}
// unmarshal the json.RawMessage field called OpArgs.
//
// Dst interface is the generic type to Unmarshal OpArgs into, and we will
// set the type it should contain depending on the value specified in Cmd.
var dst interface{}
switch message.Operation.OpCmd {
case "ps":
// Loop the the processes map, and find all that is active to
// be returned in the reply message.
proc.processes.active.mu.Lock()
for _, pTmp := range proc.processes.active.procNames {
s := fmt.Sprintf("%v, proc: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name())
sb := []byte(s)
out = append(out, sb...)
}
proc.processes.active.mu.Unlock()
case "startProc":
// Set the empty interface type dst to &OpStart.
dst = &OpCmdStartProc{}
err := json.Unmarshal(message.Operation.OpArg, &dst)
if err != nil {
er := fmt.Errorf("error: methodREQOpCommand startProc json.Umarshal failed : %v, OpArg: %v", err, message.Operation.OpArg)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Assert it into the correct non pointer value.
arg := *dst.(*OpCmdStartProc)
if arg.Method == "" {
er := fmt.Errorf("error: startProc: no method specified: %v" + fmt.Sprint(message))
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
// Create the process and start it.
sub := newSubject(arg.Method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go procNew.spawnWorker(proc.processes, proc.natsConn)
er := fmt.Errorf("info: startProc: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
case "stopProc":
// Set the interface type dst to &OpStart.
dst = &OpCmdStopProc{}
err := json.Unmarshal(message.Operation.OpArg, &dst)
if err != nil {
er := fmt.Errorf("error: methodREQOpCommand stopProc json.Umarshal failed : %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Assert it into the correct non pointer value.
arg := *dst.(*OpCmdStopProc)
// Based on the arg values received in the message we create a
// processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the
// actual process we want to stop.
sub := newSubject(arg.Method, string(arg.RecevingNode))
processName := processNameGet(sub.name(), arg.Kind)
// Check if the message contains an id.
err = func() error {
if arg.ID == 0 {
er := fmt.Errorf("error: stopProc: did not find process to stop: %v on %v", sub, message.ToNode)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return er
}
return nil
}()
if err != nil {
er := fmt.Errorf("error: stopProc: err was not nil: %v : %v on %v", err, sub, message.ToNode)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
// Remove the process from the processes active map if found.
proc.processes.active.mu.Lock()
toStopProc, ok := proc.processes.active.procNames[processName]
if ok {
// Delete the process from the processes map
delete(proc.processes.active.procNames, processName)
// Stop started go routines that belong to the process.
toStopProc.ctxCancel()
// Stop subscribing for messages on the process's subject.
err := toStopProc.natsSubscription.Unsubscribe()
if err != nil {
er := fmt.Errorf("error: methodREQOpCommand, toStopProc, failed to stop nats.Subscription: %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Remove the prometheus label
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
er := fmt.Errorf("info: stopProc: stopped %v on %v", sub, message.ToNode)
sendInfoLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
newReplyMessage(proc, message, []byte(er.Error()))
} else {
er := fmt.Errorf("error: stopProc: methodREQOpCommand, did not find process to stop: %v on %v", sub, message.ToNode)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
newReplyMessage(proc, message, []byte(er.Error()))
}
proc.processes.active.mu.Unlock()
}
newReplyMessage(proc, message, out)
}()
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n", proc.node, message.ID))
return ackMsg, nil
}
// ---- New operations
// --- OpProcessList
type methodREQOpProcessList struct {
commandOrEvent CommandOrEvent

View file

@ -148,145 +148,6 @@ func messageSlide(app *tview.Application) tview.Primitive {
case "FileName":
value := ".log"
p.msgInputForm.AddInputField(fieldName, value, 30, nil, nil)
case "Operation":
// Prepare the selectedFunc that will be triggered for the operations
// when a field in the dropdown is selected.
// This selectedFunc will generate the sub fields of the selected
// operation, and will also remove any previously drawn operation
// sub fields from the current form.
values := []string{"ps", "startProc", "stopProc"}
selectedFunc := func(option string, optionIndex int) {
// Prepare the map structure that knows what tview items the
// operation should contain.
// Then we can pick from this map later, to know what fields
// to draw, and also which fields to delete if another operation
// is selected from the dropdown.
type formItem struct {
label string
formItem tview.FormItem
}
operationFormItems := map[string][]formItem{}
operationFormItems["ps"] = nil
operationFormItems["startProc"] = func() []formItem {
formItems := []formItem{}
// Get values values to be used for the "Method" dropdown.
var m steward.Method
ma := m.GetMethodsAvailable()
values := []string{}
for k := range ma.Methodhandlers {
values = append(values, string(k))
}
// Create the individual form items, and append them to the
// formItems slice to be drawn later.
{
label := "startProc Method"
item := tview.NewDropDown()
item.SetLabel(label).SetOptions(values, nil).SetLabelWidth(25)
formItems = append(formItems, formItem{label: label, formItem: item})
}
return formItems
}()
operationFormItems["stopProc"] = func() []formItem {
formItems := []formItem{}
// RecevingNode node `json:"receivingNode"`
// Method Method `json:"method"`
// Kind processKind `json:"kind"`
// ID int `json:"id"`
// Get nodes from file.
values, err = getNodeNames("nodeslist.cfg")
if err != nil {
fmt.Fprintf(p.logForm, "%v: error: unable to get nodes\n", time.Now().Format("Mon Jan _2 15:04:05 2006"))
return nil
}
{
label := "stopProc ToNode"
item := tview.NewDropDown()
item.SetLabel(label).SetOptions(values, nil).SetLabelWidth(25)
formItems = append(formItems, formItem{label: label, formItem: item})
}
// Get values values to be used for the "Method" dropdown.
var m steward.Method
ma := m.GetMethodsAvailable()
values := []string{}
for k := range ma.Methodhandlers {
values = append(values, string(k))
}
// Create the individual form items, and append them to the
// formItems slice.
{
label := "stopProc Method"
item := tview.NewDropDown()
item.SetLabel(label).SetOptions(values, nil).SetLabelWidth(25)
formItems = append(formItems, formItem{label: label, formItem: item})
}
processKind := []string{"publisher", "subscriber"}
{
label := "stopProc processKind"
item := tview.NewDropDown()
item.SetLabel(label).SetOptions(processKind, nil).SetLabelWidth(25)
formItems = append(formItems, formItem{label: label, formItem: item})
}
{
label := "stopProc ID"
item := tview.NewInputField()
item.SetLabel(label).SetFieldWidth(30).SetLabelWidth(25)
formItems = append(formItems, formItem{label: label, formItem: item})
}
return formItems
}()
itemDraw := func(label string) {
// Delete previously drawn sub operation form items.
for _, vSlice := range operationFormItems {
for _, v := range vSlice {
i := p.msgInputForm.GetFormItemIndex(v.label)
if i > -1 {
p.msgInputForm.RemoveFormItem(i)
}
}
}
// Get and draw the form items to the form.
formItems := operationFormItems[label]
for _, v := range formItems {
p.msgInputForm.AddFormItem(v.formItem)
}
}
switch option {
case "ps":
itemDraw("ps")
case "startProc":
itemDraw("startProc")
case "stopProc":
itemDraw("stopProc")
default:
fmt.Fprintf(p.logForm, "%v: error: missing menu item for %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), option)
}
}
p.msgInputForm.AddDropDown(fieldName, values, 0, selectedFunc).SetItemPadding(1)
default:
// Add a no definition fields to the form if a a field within the
@ -305,11 +166,6 @@ func messageSlide(app *tview.Application) tview.Primitive {
//
// TODO: Should also add a write directly to socket here.
AddButton("generate to console", func() {
// ---
opCmdStartProc := steward.OpCmdStartProc{}
opCmdStopProc := steward.OpCmdStopProc{}
// ---
// fh, err := os.Create("message.json")
// if err != nil {
// log.Fatalf("error: failed to create test.log file: %v\n", err)
@ -379,25 +235,6 @@ func messageSlide(app *tview.Application) tview.Primitive {
m.Directory = value
case "FileName":
m.FileName = value
case "Operation":
// We need to check what type of operation it is, and pick
// the correct struct type, and fill it with values
switch value {
case "ps":
//TODO
case "startProc":
m.Operation = &opCmdStartProc
case "stopProc":
m.Operation = &opCmdStopProc
default:
m.Operation = nil
}
case "startProc Method":
if value == "" {
fmt.Fprintf(p.logForm, "%v : error: missing startProc Method\n", time.Now().Format("Mon Jan _2 15:04:05 2006"))
return
}
opCmdStartProc.Method = steward.Method(value)
default:
fmt.Fprintf(p.logForm, "%v : error: did not find case defenition for how to handle the \"%v\" within the switch statement\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), label)
@ -560,6 +397,4 @@ type msg struct {
// on a file being saved as the result of data being handled
// by a method handler.
FileName string `json:"fileName" yaml:"fileName"`
// operation are used to give an opCmd and opArg's.
Operation interface{} `json:"operation,omitempty"`
}