From b8a2d3d5e7572b5047cd2a709f7b49a283bde3c8 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 16 Jun 2022 23:32:44 +0200 Subject: [PATCH] removed copyFile and relay request types --- Dockerfile | 4 - README.md | 124 --------- configuration_flags.go | 30 --- errorkernel.go | 10 +- message_and_subject.go | 15 -- process.go | 41 --- processes.go | 47 ---- requests.go | 21 -- requests_copy.go | 4 +- requests_file_handling.go | 253 ------------------ requests_std.go | 144 ---------- ringbuffer.go | 72 ----- .../create-docker-compose-files/env.env.tpl | 3 +- server.go | 25 -- tui.go | 36 --- tui_msg.go | 2 - 16 files changed, 5 insertions(+), 826 deletions(-) diff --git a/Dockerfile b/Dockerfile index 173618b..1dd4cf8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -67,7 +67,6 @@ ENV START_SUB_REQ_HTTP_GET "" ENV START_SUB_REQ_HTTP_GET_SCHEDULED "" ENV START_SUB_REQ_TAIL_FILE "" ENV START_SUB_REQ_CLI_COMMAND_CONT "" -ENV START_SUB_REQ_RELAY "" CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/steward\ -ringBufferSize=$RING_BUFFER_SIZE\ @@ -107,8 +106,6 @@ CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/steward\ -startSubREQToFileAppend=$START_SUB_REQ_TO_FILE_APPEND\ -startSubREQToFile=$START_SUB_REQ_TO_FILE\ -startSubREQToFileNACK=$START_SUB_REQ_TO_FILE_NACK\ - -startSubREQCopyFileFrom=$START_SUB_REQ_COPY_FILE_FROM\ - -startSubREQCopyFileTo=$START_SUB_REQ_COPY_FILE_TO\ -startSubREQPing=$START_SUB_REQ_PING\ -startSubREQPong=$START_SUB_REQ_PONG\ -startSubREQCliCommand=$START_SUB_REQ_CLI_COMMAND\ @@ -117,5 +114,4 @@ CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/steward\ -startSubREQHttpGetScheduled=$START_SUB_REQ_HTTP_GET_SCHEDULED\ -startSubREQTailFile=$START_SUB_REQ_TAIL_FILE\ -startSubREQCliCommandCont=$START_SUB_REQ_CLI_COMMAND_CONT\ - -startSubREQRelay=$START_SUB_REQ_RELAY\ "] diff --git a/README.md b/README.md index 833484f..46dbe64 100644 --- a/README.md +++ b/README.md @@ -29,13 +29,6 @@ As long as you can do something as an operator on in a shell on a system you can - [Error messages from nodes](#error-messages-from-nodes) - [Message handling and threads](#message-handling-and-threads) - [Timeouts and retries for requests](#timeouts-and-retries-for-requests) - - [REQRelay](#reqrelay) - - [Relay Step 1](#relay-step-1) - - [Relay Step 2](#relay-step-2) - - [Relay Step 3](#relay-step-3) - - [Relay Step 4](#relay-step-4) - - [Relay Step 5](#relay-step-5) - - [Relay Step 6](#relay-step-6) - [Flags and configuration file](#flags-and-configuration-file) - [Schema for the messages to send into Steward via the API's](#schema-for-the-messages-to-send-into-steward-via-the-apis) - [Nats messaging timeouts](#nats-messaging-timeouts) @@ -56,7 +49,6 @@ As long as you can do something as an operator on in a shell on a system you can - [REQHttpGet](#reqhttpget) - [REQHttpGetScheduled](#reqhttpgetscheduled) - [REQHello](#reqhello) - - [REQCopyFileFrom](#reqcopyfilefrom) - [REQCopySrc](#reqcopysrc) - [REQErrorLog](#reqerrorlog) - [Request Methods used for reply messages](#request-methods-used-for-reply-messages) @@ -285,75 +277,6 @@ In the above example, the values set meaning: If no timeout are specified in a message the defaults specified in the **etc/config.yaml** are used. -#### REQRelay - -Instead of injecting the new Requests on the central server, you can relay messages via another node as long as the nats-server authorization conf permits it. This is what REQRelay is for. - -This functionality can be thought of as attaching a terminal to a Steward instance. Instead of injecting messages directly on for example the central Steward instance you can use another Steward instance and relay messages via the central instance. - -Example configuration of relay authorization for nats-server can be found in the [doc folder](doc/). - -Example: - -```json -[ - { - "directory":"/var/tail-logs/", - "fileName": "my-wifi.log", - "toNode": "node1", - "relayViaNode": "central", - "relayReplyMethod": "REQToConsole", - "methodArgs": ["bash","-c","tail -f /var/log/wifi.log"], - "method":"REQCliCommandCont", - "replyMethod":"REQToFileAppend", - "ACKTimeout":5, - "retries":3, - "replyACKTimeout":5, - "replyRetries":3, - "methodTimeout": 10 - } -] -``` - -```text - 1 2 3 - ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ -------------▷│ │------------▷│ │------------▷│ │ - │ node1 │ │ central │ │ node2 │ -◁------------│ │◁------------│ │◁------------│ │ - └─────────────┘ └─────────────┘ └─────────────┘ - 6 5 4 -``` - -Steps Explained: - -##### Relay Step 1 - -- The **relayViaNode** field of the message is checked, and If set the message will be encapsulated within a **REQRelayInitial** message where the original message field values are kept, and put back on the message queue on **node1**. -- The new **REQRelayInitial** message will again be picked up on **node1** and handled by the **REQRelayInitial handler**. -- The **REQRelayInitial handler** will set the message method to **REQRelay**, and forward the message to the node value in the **relayViaNode** field. - -##### Relay Step 2 - -- On **central** the **REQRelay method handler** will recreate the **original** message, and forward it to **node2**. - -##### Relay Step 3 - -- **Node2** receives the request, and executes the **original** method with the arguments specified. - -##### Relay Step 4 - -- The result is sent back to **central** in the form of a normal reply message. - -##### Relay Step 5 - -- When the **reply** message is received on central a copy of the **reply** message will be created , and forwarded to **node1** where it originated. -- The the **original reply method** `"replyMethod":"REQToFileAppend"` is handled on central. - -##### Relay Step 6 - -- On **node1** the **relayReplyMethod** is checked for how to handle the message. In this case it is printed to the consoles STDOUT. - ### Flags and configuration file Steward supports both the use of flags with values set at startup, and the use of a config file. @@ -386,8 +309,6 @@ Steward supports both the use of flags with values set at startup, and the use o - replyMethodTimeout : `int` - directory : `string` - fileName : `string` -- RelayViaNode: `string` -- RelayReplyMethod: `string` ### Nats messaging timeouts @@ -674,29 +595,6 @@ All nodes have the flag option to start sending Hello message to the central ser ] ``` -#### REQCopyFileFrom - -Copy a file from one node to another node. - -- Source node to copy from is specified in the toNode/toNodes field -- The file to copy and the destination node is specified in the **methodArgs** field: - 1. The first field is the full path of the source file. - 2. The second field is the destination node for where to copy the file to. - 3. The third field is the full path for where to write the copied file. - -```json -[ - { - "directory": "copy", - "fileName": "copy.log", - "toNodes": ["central"], - "method":"REQCopyFileFrom", - "methodArgs": ["./tmp2.txt","ship2","/tmp/tmp2.txt"], - "replyMethod":"REQToFileAppend" - } -] -``` - #### REQCopySrc Copy a file from one node to another node. @@ -1465,10 +1363,6 @@ StartSubREQToFileAppend bool StartSubREQToFile bool // Subscriber for writing to file without ACK StartSubREQToFileNACK bool -// Subscriber for reading files to copy -StartSubREQCopyFileFrom bool -// Subscriber for writing copied files to disk -StartSubREQCopyFileTo bool // Subscriber for Echo Request StartSubREQPing bool // Subscriber for Echo Reply @@ -1485,8 +1379,6 @@ StartSubREQHttpGetScheduled bool StartSubREQTailFile bool // Subscriber for continously delivery of output from cli commands. StartSubREQCliCommandCont bool -// Subscriber for relay messages. -StartSubREQRelay bool ``` ## Appendix-B @@ -1550,20 +1442,4 @@ FileName string `json:"fileName" yaml:"fileName"` // generated and we also need a copy of the details of the the // initial request message. PreviousMessage *Message -// The node to relay the message via. -RelayViaNode Node `json:"relayViaNode" yaml:"relayViaNode"` -// The node where the relayed message originated, and where we want -// to send back the end result. -RelayFromNode Node `json:"relayFromNode" yaml:"relayFromNode"` -// The original value of the ToNode field of the original message. -RelayToNode Node `json:"relayToNode" yaml:"relayToNode"` -// The original method of the message. -RelayOriginalMethod Method `json:"relayOriginalMethod" yaml:"relayOriginalMethod"` -// The method to use when the reply of the relayed message came -// back to where originated from. -RelayReplyMethod Method `json:"relayReplyMethod" yaml:"relayReplyMethod"` -// done is used to signal when a message is fully processed. -// This is used for signaling back to the ringbuffer that we are -// done with processing a message, and the message can be removed -// from the ringbuffer and into the time series log. ``` diff --git a/configuration_flags.go b/configuration_flags.go index 2317f48..161aee0 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -110,10 +110,6 @@ type Configuration struct { // Subscriber for writing to file without ACK StartSubREQToFileNACK bool // Subscriber for reading files to copy - StartSubREQCopyFileFrom bool - // Subscriber for writing copied files to disk - StartSubREQCopyFileTo bool - // Subscriber for reading files to copy StartSubREQCopySrc bool // Subscriber for writing copied files to disk StartSubREQCopyDst bool @@ -133,8 +129,6 @@ type Configuration struct { StartSubREQTailFile bool // Subscriber for continously delivery of output from cli commands. StartSubREQCliCommandCont bool - // Subscriber for relay messages. - StartSubREQRelay bool } // ConfigurationFromFile should have the same structure as @@ -186,8 +180,6 @@ type ConfigurationFromFile struct { StartSubREQToFileAppend *bool StartSubREQToFile *bool StartSubREQToFileNACK *bool - StartSubREQCopyFileFrom *bool - StartSubREQCopyFileTo *bool StartSubREQCopySrc *bool StartSubREQCopyDst *bool StartSubREQPing *bool @@ -198,7 +190,6 @@ type ConfigurationFromFile struct { StartSubREQHttpGetScheduled *bool StartSubREQTailFile *bool StartSubREQCliCommandCont *bool - StartSubREQRelay *bool } // NewConfiguration will return a *Configuration. @@ -254,8 +245,6 @@ func newConfigurationDefaults() Configuration { StartSubREQToFileAppend: true, StartSubREQToFile: true, StartSubREQToFileNACK: true, - StartSubREQCopyFileFrom: true, - StartSubREQCopyFileTo: true, StartSubREQCopySrc: true, StartSubREQCopyDst: true, StartSubREQPing: true, @@ -266,7 +255,6 @@ func newConfigurationDefaults() Configuration { StartSubREQHttpGetScheduled: true, StartSubREQTailFile: true, StartSubREQCliCommandCont: true, - StartSubREQRelay: false, } return c } @@ -497,16 +485,6 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.StartSubREQToFileNACK = *cf.StartSubREQToFileNACK } - if cf.StartSubREQCopyFileFrom == nil { - conf.StartSubREQCopyFileFrom = cd.StartSubREQCopyFileFrom - } else { - conf.StartSubREQCopyFileFrom = *cf.StartSubREQCopyFileFrom - } - if cf.StartSubREQCopyFileTo == nil { - conf.StartSubREQCopyFileTo = cd.StartSubREQCopyFileTo - } else { - conf.StartSubREQCopyFileTo = *cf.StartSubREQCopyFileTo - } if cf.StartSubREQCopySrc == nil { conf.StartSubREQCopySrc = cd.StartSubREQCopySrc } else { @@ -557,11 +535,6 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.StartSubREQCliCommandCont = *cf.StartSubREQCliCommandCont } - if cf.StartSubREQRelay == nil { - conf.StartSubREQRelay = cd.StartSubREQRelay - } else { - conf.StartSubREQRelay = *cf.StartSubREQRelay - } return conf } @@ -643,8 +616,6 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false") flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false") flag.BoolVar(&c.StartSubREQToFileNACK, "startSubREQToFileNACK", fc.StartSubREQToFileNACK, "true/false") - flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false") - flag.BoolVar(&c.StartSubREQCopyFileTo, "startSubREQCopyFileTo", fc.StartSubREQCopyFileTo, "true/false") flag.BoolVar(&c.StartSubREQCopySrc, "startSubREQCopySrc", fc.StartSubREQCopySrc, "true/false") flag.BoolVar(&c.StartSubREQCopyDst, "startSubREQCopyDst", fc.StartSubREQCopyDst, "true/false") flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false") @@ -655,7 +626,6 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.StartSubREQHttpGetScheduled, "startSubREQHttpGetScheduled", fc.StartSubREQHttpGetScheduled, "true/false") flag.BoolVar(&c.StartSubREQTailFile, "startSubREQTailFile", fc.StartSubREQTailFile, "true/false") flag.BoolVar(&c.StartSubREQCliCommandCont, "startSubREQCliCommandCont", fc.StartSubREQCliCommandCont, "true/false") - flag.BoolVar(&c.StartSubREQRelay, "startSubREQRelay", fc.StartSubREQRelay, "true/false") purgeBufferDB := flag.Bool("purgeBufferDB", false, "true/false, purge the incoming buffer db and all it's state") diff --git a/errorkernel.go b/errorkernel.go index dd0bd17..74ee808 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -68,14 +68,8 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error } sendErrorOrInfo := func(errEvent errorEvent) { - var er string - // Decide what extra information to add to the error message. - switch { - case errEvent.message.RelayFromNode != "": - er = fmt.Sprintf("%v, node: %v, relayFromNode: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.message.RelayFromNode, errEvent.err) - default: - er = fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err) - } + + er := fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err) sam := subjectAndMessage{ Subject: newSubject(REQErrorLog, "errorCentral"), diff --git a/message_and_subject.go b/message_and_subject.go index 3eb7781..7e1a68c 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -73,21 +73,6 @@ type Message struct { // initial request message. PreviousMessage *Message - // The node to relay the message via. - RelayViaNode Node `json:"relayViaNode" yaml:"relayViaNode"` - // The original value of the RelayViaNode. - RelayOriginalViaNode Node `json:"relayOriginalViaNode" yaml:"relayOriginalViaNode"` - // The node where the relayed message originated, and where we want - // to send back the end result. - RelayFromNode Node `json:"relayFromNode" yaml:"relayFromNode"` - // The original value of the ToNode field of the original message. - RelayToNode Node `json:"relayToNode" yaml:"relayToNode"` - // The original method of the message. - RelayOriginalMethod Method `json:"relayOriginalMethod" yaml:"relayOriginalMethod"` - // The method to use when the reply of the relayed message came - // back to where originated from. - RelayReplyMethod Method `json:"relayReplyMethod" yaml:"relayReplyMethod"` - // done is used to signal when a message is fully processed. // This is used for signaling back to the ringbuffer that we are // done with processing a message, and the message can be removed diff --git a/process.go b/process.go index a17fbac..d5462b6 100644 --- a/process.go +++ b/process.go @@ -462,47 +462,6 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, } } - // Send final reply for a relayed message back to the originating node. - // - // Check if the previous message was a relayed message, and if true - // make a copy of the current message where the to field is set to - // the value of the previous message's RelayFromNode field, so we - // also can send the a copy of the reply back to where it originated. - if message.PreviousMessage != nil && message.PreviousMessage.RelayOriginalViaNode != "" { - - // make a copy of the message - msgCopy := message - msgCopy.ToNode = msgCopy.PreviousMessage.RelayFromNode - - // We set the replyMethod of the initial message. - // If no RelayReplyMethod was found, we default to the reply - // method of the previous message. - switch { - case msgCopy.PreviousMessage.RelayReplyMethod == "": - er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previous message: %v ", msgCopy) - p.errorKernel.errSend(p, message, er) - - msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod - - case msgCopy.PreviousMessage.RelayReplyMethod != "": - msgCopy.Method = msgCopy.PreviousMessage.RelayReplyMethod - } - - // Reset the previousMessage relay fields so the message don't loop. - message.PreviousMessage.RelayViaNode = "" - message.PreviousMessage.RelayOriginalViaNode = "" - - // Create a SAM for the msg copy that will be sent back the where the - // relayed message originated from. - sam, err := newSubjectAndMessage(msgCopy) - if err != nil { - er := fmt.Errorf("error: subscriberHandler: newSubjectAndMessage : %v, message copy: %v", err, msgCopy) - p.errorKernel.errSend(p, message, er) - } - - p.toRingbufferCh <- []subjectAndMessage{sam} - } - // Check if it is an ACK or NACK message, and do the appropriate action accordingly. // // With ACK messages Steward will keep the state of the message delivery, and try to diff --git a/processes.go b/processes.go index 8df5581..ffad931 100644 --- a/processes.go +++ b/processes.go @@ -133,14 +133,6 @@ func (p *processes) Start(proc process) { proc.startup.subREQToFileNACK(proc) } - if proc.configuration.StartSubREQCopyFileFrom { - proc.startup.subREQCopyFileFrom(proc) - } - - if proc.configuration.StartSubREQCopyFileTo { - proc.startup.subREQCopyFileTo(proc) - } - if proc.configuration.StartSubREQCopySrc { proc.startup.subREQCopySrc(proc) } @@ -232,12 +224,6 @@ func (p *processes) Start(proc process) { proc.startup.subREQCliCommandCont(proc) } - if proc.configuration.StartSubREQRelay { - proc.startup.subREQRelay(proc) - } - - proc.startup.subREQRelayInitial(proc) - proc.startup.subREQPublicKey(proc) } @@ -672,22 +658,6 @@ func (s startup) subREQToFileNACK(p process) { go proc.spawnWorker() } -func (s startup) subREQCopyFileFrom(p process) { - log.Printf("Starting copy file from subscriber: %#v\n", p.node) - sub := newSubject(REQCopyFileFrom, string(p.node)) - proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - - go proc.spawnWorker() -} - -func (s startup) subREQCopyFileTo(p process) { - log.Printf("Starting copy file to subscriber: %#v\n", p.node) - sub := newSubject(REQCopyFileTo, string(p.node)) - proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - - go proc.spawnWorker() -} - func (s startup) subREQCopySrc(p process) { log.Printf("Starting copy src subscriber: %#v\n", p.node) sub := newSubject(REQCopySrc, string(p.node)) @@ -728,23 +698,6 @@ func (s startup) subREQCliCommandCont(p process) { go proc.spawnWorker() } -func (s startup) subREQRelay(p process) { - nodeWithRelay := fmt.Sprintf("*.%v", p.node) - log.Printf("Starting Relay: %#v\n", nodeWithRelay) - sub := newSubject(REQRelay, string(nodeWithRelay)) - proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - - go proc.spawnWorker() -} - -func (s startup) subREQRelayInitial(p process) { - log.Printf("Starting Relay Initial: %#v\n", p.node) - sub := newSubject(REQRelayInitial, string(p.node)) - proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - - go proc.spawnWorker() -} - func (s startup) subREQPublicKey(p process) { log.Printf("Starting get Public Key subscriber: %#v\n", p.node) sub := newSubject(REQPublicKey, string(p.node)) diff --git a/requests.go b/requests.go index d981488..09fef98 100644 --- a/requests.go +++ b/requests.go @@ -99,11 +99,6 @@ const ( REQToFile Method = "REQToFile" // REQToFileNACK same as REQToFile but NACK. REQToFileNACK Method = "REQToFileNACK" - // Read the source file to be copied to some node. - REQCopyFileFrom Method = "REQCopyFileFrom" - // Write the destination copied to some node. - REQCopyFileTo Method = "REQCopyFileTo" - // Initial request for file copying. // Initiated by the user. REQCopySrc Method = "REQCopySrc" // Initial request for file copying. @@ -130,10 +125,6 @@ const ( REQHttpGetScheduled Method = "REQHttpGetScheduled" // Tail file REQTailFile Method = "REQTailFile" - // Write to steward socket - REQRelay Method = "REQRelay" - // The method handler for the first step in a relay chain. - REQRelayInitial Method = "REQRelayInitial" // REQNone is used when there should be no reply. REQNone Method = "REQNone" // REQTest is used only for testing to be able to grab the output @@ -228,12 +219,6 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQToFileNACK: methodREQToFile{ event: EventNACK, }, - REQCopyFileFrom: methodREQCopyFileFrom{ - event: EventACK, - }, - REQCopyFileTo: methodREQCopyFileTo{ - event: EventACK, - }, REQCopySrc: methodREQCopySrc{ event: EventACK, }, @@ -267,12 +252,6 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQTailFile: methodREQTailFile{ event: EventACK, }, - REQRelay: methodREQRelay{ - event: EventACK, - }, - REQRelayInitial: methodREQRelayInitial{ - event: EventACK, - }, REQPublicKey: methodREQPublicKey{ event: EventACK, }, diff --git a/requests_copy.go b/requests_copy.go index 863b507..1e150ad 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -159,7 +159,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // Get the file permissions fileInfo, err := os.Stat(SrcFilePath) if err != nil { - // errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err) + // errCh <- fmt.Errorf("error: methodREQCopySrc: failed to open file: %v, %v", SrcFilePath, err) log.Printf("error: copySrcSubProcFunc: failed to stat file: %v\n", err) return } @@ -361,7 +361,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel fh, err := os.Open(cia.SrcFilePath) if err != nil { - // errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err) + // errCh <- fmt.Errorf("error: copySrcSubProcFunc: failed to open file: %v, %v", SrcFilePath, err) log.Fatalf("error: copySrcSubProcFunc: failed to open file: %v\n", err) return nil } diff --git a/requests_file_handling.go b/requests_file_handling.go index 699229e..8dbcc9b 100644 --- a/requests_file_handling.go +++ b/requests_file_handling.go @@ -1,13 +1,9 @@ package steward import ( - "context" "fmt" - "io" "os" - "path" "path/filepath" - "sync" "github.com/hpcloud/tail" ) @@ -114,255 +110,6 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([] return ackMsg, nil } -// ---- - -type methodREQCopyFileFrom struct { - event Event -} - -func (m methodREQCopyFileFrom) getKind() Event { - return m.event -} - -// Handle writing to a file. Will truncate any existing data if the file did already -// exist. -func (m methodREQCopyFileFrom) handler(proc process, message Message, node string) ([]byte, error) { - - proc.processes.wg.Add(1) - go func() { - defer proc.processes.wg.Done() - - switch { - case len(message.MethodArgs) < 3: - er := fmt.Errorf("error: methodREQCopyFileFrom: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") - proc.errorKernel.errSend(proc, message, er) - - return - } - - SrcFilePath := message.MethodArgs[0] - DstNode := message.MethodArgs[1] - DstFilePath := message.MethodArgs[2] - - // Get a context with the timeout specified in message.MethodTimeout. - ctx, cancel := getContextForMethodTimeout(proc.ctx, message) - defer cancel() - - outCh := make(chan []byte) - errCh := make(chan error) - - // Read the file, and put the result on the out channel to be sent when done reading. - proc.processes.wg.Add(1) - go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh) - - // Wait here until we got the data to send, then create a new message - // and send it. - // Also checking the ctx.Done which calls Cancel will allow us to - // kill all started go routines started by this message. - select { - case <-ctx.Done(): - er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) - - return - case er := <-errCh: - proc.errorKernel.errSend(proc, message, er) - - return - case out := <-outCh: - dstDir := filepath.Dir(DstFilePath) - dstFile := filepath.Base(DstFilePath) - - // Prepare for sending a new message with the output - - // Copy the original message to get the defaults for timeouts etc, - // and set new values for fields to change. - msg := message - msg.ToNode = Node(DstNode) - //msg.Method = REQToFile - msg.Method = REQCopyFileTo - msg.Data = out - msg.Directory = dstDir - msg.FileName = dstFile - - // Create SAM and put the message on the send new message channel. - - sam, err := newSubjectAndMessage(msg) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) - } - - proc.toRingbufferCh <- []subjectAndMessage{sam} - - replyData := fmt.Sprintf("info: succesfully read the file %v, and sent the content to %v\n", SrcFilePath, DstNode) - - newReplyMessage(proc, message, []byte(replyData)) - } - - }() - - ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) - return ackMsg, nil -} - -// copyFileFrom will read a file to be copied from the specified SrcFilePath. -// The result of be delivered on the provided outCh. -func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, errCh chan error, outCh chan []byte) { - defer wg.Done() - - const natsMaxMsgSize = 1000000 - - fi, err := os.Stat(SrcFilePath) - - // Check if the src file exists, and that it is not bigger than - // the default limit used by nats which is 1MB. - switch { - case os.IsNotExist(err): - errCh <- fmt.Errorf("error: methodREQCopyFile: src file not found: %v", SrcFilePath) - return - case fi.Size() > natsMaxMsgSize: - errCh <- fmt.Errorf("error: methodREQCopyFile: src file to big. max size: %v", natsMaxMsgSize) - return - } - - fh, err := os.Open(SrcFilePath) - if err != nil { - errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err) - return - } - - b, err := io.ReadAll(fh) - if err != nil { - errCh <- fmt.Errorf("error: methodREQCopyFile: failed to read file: %v, %v", SrcFilePath, err) - return - } - - select { - case outCh <- b: - // fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n") - case <-ctx.Done(): - return - } -} - -// ---- - -type methodREQCopyFileTo struct { - event Event -} - -func (m methodREQCopyFileTo) getKind() Event { - return m.event -} - -// Handle writing to a file. Will truncate any existing data if the file did already -// exist. -// Same as the REQToFile, but this requst type don't use the default data folder path -// for where to store files or add information about node names. -// This method also sends a msgReply back to the publisher if the method was done -// successfully, where REQToFile do not. -// This method will truncate and overwrite any existing files. -func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) { - - proc.processes.wg.Add(1) - go func() { - defer proc.processes.wg.Done() - - // Get a context with the timeout specified in message.MethodTimeout. - ctx, cancel := getContextForMethodTimeout(proc.ctx, message) - defer cancel() - - // Put data that should be the result of the action done in the inner - // go routine on the outCh. - outCh := make(chan []byte) - // Put errors from the inner go routine on the errCh. - errCh := make(chan error) - - proc.processes.wg.Add(1) - go func() { - defer proc.processes.wg.Done() - - // --- - switch { - case len(message.MethodArgs) < 3: - er := fmt.Errorf("error: methodREQCopyFileTo: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") - proc.errorKernel.errSend(proc, message, er) - - return - } - - // Pick up the values for the directory and filename for where - // to store the file. - DstFilePath := message.MethodArgs[2] - dstDir := filepath.Dir(DstFilePath) - dstFile := filepath.Base(DstFilePath) - - fileRealPath := path.Join(dstDir, dstFile) - - // Check if folder structure exist, if not create it. - if _, err := os.Stat(dstDir); os.IsNotExist(err) { - err := os.MkdirAll(dstDir, 0700) - if err != nil { - er := fmt.Errorf("failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, dstDir, err) - errCh <- er - return - } - - { - er := fmt.Errorf("info: MethodREQCopyFileTo: Creating folders %v", dstDir) - proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration) - } - } - - // Open file and write data. Truncate and overwrite any existing files. - file := filepath.Join(dstDir, dstFile) - f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) - if err != nil { - er := fmt.Errorf("failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err) - errCh <- er - return - } - defer f.Close() - - _, err = f.Write(message.Data) - f.Sync() - if err != nil { - er := fmt.Errorf("failed to write to file: file: %v, error: %v", file, err) - errCh <- er - } - - // All went ok, send a signal to the outer select statement. - outCh <- []byte(fileRealPath) - - // --- - - }() - - // Wait for messages received from the inner go routine. - select { - case <-ctx.Done(): - er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) - return - - case err := <-errCh: - er := fmt.Errorf("error: methodREQCopyFileTo: %v", err) - proc.errorKernel.errSend(proc, message, er) - return - - case out := <-outCh: - replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out) - newReplyMessage(proc, message, []byte(replyData)) - return - } - - }() - - ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) - return ackMsg, nil -} - // --- methodREQTailFile type methodREQTailFile struct { diff --git a/requests_std.go b/requests_std.go index 2c45884..03b77bd 100644 --- a/requests_std.go +++ b/requests_std.go @@ -233,150 +233,6 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by // --- -type methodREQRelayInitial struct { - event Event -} - -func (m methodREQRelayInitial) getKind() Event { - return m.event -} - -// Handler to relay messages via a host. -func (m methodREQRelayInitial) handler(proc process, message Message, node string) ([]byte, error) { - proc.processes.wg.Add(1) - go func() { - defer proc.processes.wg.Done() - - // Get a context with the timeout specified in message.MethodTimeout. - ctx, cancel := getContextForMethodTimeout(proc.ctx, message) - defer cancel() - - outCh := make(chan []byte) - errCh := make(chan error) - nothingCh := make(chan struct{}, 1) - - var out []byte - - // If the actual Method for the message is REQCopyFileFrom we need to - // do the actual file reading here so we can fill the data field of the - // message with the content of the file before relaying it. - switch { - case message.RelayOriginalMethod == REQCopyFileFrom: - switch { - case len(message.MethodArgs) < 3: - er := fmt.Errorf("error: methodREQRelayInitial: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") - proc.errorKernel.errSend(proc, message, er) - - return - } - - SrcFilePath := message.MethodArgs[0] - //DstFilePath := message.MethodArgs[2] - - // Read the file, and put the result on the out channel to be sent when done reading. - proc.processes.wg.Add(1) - go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh) - - // Since we now have read the source file we don't need the REQCopyFileFrom - // request method anymore, so we change the original method of the message - // so it will write the data after the relaying. - //dstDir := filepath.Dir(DstFilePath) - //dstFile := filepath.Base(DstFilePath) - message.RelayOriginalMethod = REQCopyFileTo - //message.FileName = dstFile - //message.Directory = dstDir - default: - // No request type that need special handling if relayed, so we should signal that - // there is nothing to do for the select below. - // We need to do this signaling in it's own go routine here, so we don't block here - // since the select below is in the same function. - go func() { - nothingCh <- struct{}{} - }() - } - - select { - case <-ctx.Done(): - er := fmt.Errorf("error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v", message.MethodArgs) - proc.errorKernel.errSend(proc, message, er) - - return - case er := <-errCh: - proc.errorKernel.errSend(proc, message, er) - - return - case <-nothingCh: - // Do nothing. - case out = <-outCh: - - } - - // relay the message to the actual host here by prefixing the the RelayToNode - // to the subject. - relayTo := fmt.Sprintf("%v.%v", message.RelayToNode, message.RelayOriginalViaNode) - // message.ToNode = message.RelayOriginalViaNode - message.ToNode = Node(relayTo) - message.FromNode = Node(node) - message.Method = REQRelay - message.Data = out - - sam, err := newSubjectAndMessage(message) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) - } - - proc.toRingbufferCh <- []subjectAndMessage{sam} - }() - - // Send back an ACK message. - ackMsg := []byte("confirmed REQRelay from: " + node + ": " + fmt.Sprint(message.ID)) - return ackMsg, nil -} - -// ---- - -type methodREQRelay struct { - event Event -} - -func (m methodREQRelay) getKind() Event { - return m.event -} - -// Handler to relay messages via a host. -func (m methodREQRelay) handler(proc process, message Message, node string) ([]byte, error) { - // relay the message here to the actual host here. - - proc.processes.wg.Add(1) - go func() { - defer proc.processes.wg.Done() - - message.ToNode = message.RelayToNode - message.FromNode = Node(node) - message.Method = message.RelayOriginalMethod - - sam, err := newSubjectAndMessage(message) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - proc.errorKernel.errSend(proc, message, er) - - return - } - - select { - case proc.toRingbufferCh <- []subjectAndMessage{sam}: - case <-proc.ctx.Done(): - } - }() - - // Send back an ACK message. - ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) - return ackMsg, nil -} - -// --- - type methodREQToConsole struct { event Event } diff --git a/ringbuffer.go b/ringbuffer.go index 63afdde..5dbf801 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -304,78 +304,6 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB // it out of the K/V Store. r.deleteKeyFromBucket(r.samValueBucket, strconv.Itoa(v.ID)) - //m := v.Data.Message - //t := time.Now().Format("Mon Jan _2 15:04:05 2006") - - //tmpout := os.Stdout - - //_ = fmt.Sprintf("%v\n", t) - //_ = fmt.Sprintf("%v\n", m.ID) - //_ = fmt.Sprintf("%v\n", m.ToNode) - //_ = fmt.Sprintf("%v\n", m.ToNodes) - //_ = fmt.Sprintf("%v\n", m.Data) - //_ = fmt.Sprintf("%v\n", m.Method) - //_ = fmt.Sprintf("%v\n", m.MethodArgs) - //_ = fmt.Sprintf("%v\n", m.ArgSignature) - //_ = fmt.Sprintf("%v\n", m.ReplyMethod) - //_ = fmt.Sprintf("%v\n", m.ReplyMethodArgs) - //_ = fmt.Sprintf("%v\n", m.IsReply) - //_ = fmt.Sprintf("%v\n", m.FromNode) - //_ = fmt.Sprintf("%v\n", m.ACKTimeout) - //_ = fmt.Sprintf("%v\n", m.Retries) - //_ = fmt.Sprintf("%v\n", m.ReplyACKTimeout) - //_ = fmt.Sprintf("%v\n", m.ReplyRetries) - //_ = fmt.Sprintf("%v\n", m.MethodTimeout) - //_ = fmt.Sprintf("%v\n", m.ReplyMethodTimeout) - //_ = fmt.Sprintf("%v\n", m.Directory) - //_ = fmt.Sprintf("%v\n", m.FileName) - //_ = fmt.Sprintf("%v\n", m.PreviousMessage) - //_ = fmt.Sprintf("%v\n", m.RelayViaNode) - //_ = fmt.Sprintf("%v\n", m.RelayOriginalViaNode) - //_ = fmt.Sprintf("%v\n", m.RelayFromNode) - //_ = fmt.Sprintf("%v\n", m.RelayToNode) - //_ = fmt.Sprintf("%v\n", m.RelayOriginalMethod) - //_ = fmt.Sprintf("%v\n", m.RelayReplyMethod) - //_ = fmt.Sprintf("%v\n", m.done) - - //str := fmt.Sprintln( - // t, - // m.ID, - // m.ToNode, - // m.ToNodes, - // m.Data, - // m.Method, - // m.MethodArgs, - // m.ArgSignature, - // m.ReplyMethod, - // m.ReplyMethodArgs, - // m.IsReply, - // m.FromNode, - // m.ACKTimeout, - // m.Retries, - // m.ReplyACKTimeout, - // m.ReplyRetries, - // m.MethodTimeout, - // m.ReplyMethodTimeout, - // m.Directory, - // m.FileName, - // m.PreviousMessage, - // m.RelayViaNode, - // m.RelayOriginalViaNode, - // m.RelayFromNode, - // m.RelayToNode, - // m.RelayOriginalMethod, - // m.RelayReplyMethod, - // m.done, - //) - - //r.permStore <- fmt.Sprintf("%v\n", str) - - // NB: Removed this one since it creates a data race with the storing of the hash value in - // the methodREQKeysDeliverUpdate. Sorted by splitting up the sprint below with the sprint - // above for now, but should investigate further what might be the case here, since the - // message have no reference to the proc and should in theory not create a race. - // js, err := json.Marshal(msgForPermStore) if err != nil { er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err) diff --git a/scripts/steward/create-docker-compose-files/env.env.tpl b/scripts/steward/create-docker-compose-files/env.env.tpl index 012ea57..da79f7e 100644 --- a/scripts/steward/create-docker-compose-files/env.env.tpl +++ b/scripts/steward/create-docker-compose-files/env.env.tpl @@ -46,5 +46,4 @@ START_SUB_REQ_TO_CONSOLE=true START_SUB_REQ_HTTP_GET=true START_SUB_REQ_HTTP_GET_SCHEDULED=true START_SUB_REQ_TAIL_FILE=true -START_SUB_REQ_CLI_COMMAND_CONT=true -START_SUB_REQ_RELAY=true \ No newline at end of file +START_SUB_REQ_CLI_COMMAND_CONT=true \ No newline at end of file diff --git a/server.go b/server.go index 3ce5b9f..747544b 100644 --- a/server.go +++ b/server.go @@ -461,31 +461,6 @@ func (s *server) routeMessagesToProcess(dbFileName string) { m := sam.Message - // Check if it is a relay message - if m.RelayViaNode != "" && m.RelayViaNode != Node(s.nodeName) { - - // Keep the original values. - m.RelayFromNode = m.FromNode - m.RelayToNode = m.ToNode - m.RelayOriginalViaNode = m.RelayViaNode - m.RelayOriginalMethod = m.Method - - // Convert it to a relay initial message. - m.Method = REQRelayInitial - // Set the toNode of the message to this host, so we send - // it to ourselves again and pick it up with the subscriber - // for the REQReplyInitial handler method. - m.ToNode = Node(s.nodeName) - - // We are now done with the initial checking for if the new - // message is a relay message, so we empty the viaNode field - // so we don't end in an endless loop here. - // The value is stored in RelayOriginalViaNode for later use. - m.RelayViaNode = "" - - sam.Subject = newSubject(REQRelayInitial, string(s.nodeName)) - } - subjName := sam.Subject.name() pn := processNameGet(subjName, processKindPublisher) diff --git a/tui.go b/tui.go index bd5c92d..a866d21 100644 --- a/tui.go +++ b/tui.go @@ -338,37 +338,7 @@ func drawMessageInputFields(p slideMessageEdit, m tuiMessage) { } p.inputForm.AddInputField(fieldName, value, fieldWidth, nil, nil) - case "RelayViaNode": - // Get nodes from file. - values, err := getNodeNames("nodeslist.cfg") - if err != nil { - log.Printf("error: unable to open file: %v\n", err) - os.Exit(1) - } - if m.RelayViaNode != nil && *m.RelayViaNode != "" { - tmp := []string{string(*m.RelayViaNode)} - tmp = append(tmp, values...) - values = tmp - } - - p.inputForm.AddDropDown(fieldName, values, 0, nil).SetItemPadding(1) - //c.msgForm.AddDropDown(mRefVal.Type().Field(i).Name, values, 0, nil).SetItemPadding(1) - case "RelayReplyMethod": - var v Method - rm := v.GetReplyMethods() - values := []string{} - for _, k := range rm { - values = append(values, string(k)) - } - - if m.RelayReplyMethod != nil && *m.RelayReplyMethod != "" { - tmp := []string{string(*m.RelayReplyMethod)} - tmp = append(tmp, values...) - values = tmp - } - - p.inputForm.AddDropDown(fieldName, values, 0, nil).SetItemPadding(1) default: // Add a no definition fields to the form if a a field within the // struct were missing an action above, so we can easily detect @@ -583,12 +553,6 @@ func (t *tui) messageSlide(app *tview.Application) tview.Primitive { m.Directory = &value case "FileName": m.FileName = &value - case "RelayViaNode": - v := Node(value) - m.RelayViaNode = &v - case "RelayReplyMethod": - v := Method(value) - m.RelayReplyMethod = &v default: fmt.Fprintf(p.logForm, "%v : error: did not find case definition for how to handle the \"%v\" within the switch statement\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), label) diff --git a/tui_msg.go b/tui_msg.go index cbb3e5c..b23dd28 100644 --- a/tui_msg.go +++ b/tui_msg.go @@ -15,6 +15,4 @@ type tuiMessage struct { ReplyMethodTimeout *int `json:"replyMethodTimeout,omitempty" yaml:"replyMethodTimeout,omitempty"` Directory *string `json:"directory,omitempty" yaml:"directory,omitempty"` FileName *string `json:"fileName,omitempty" yaml:"fileName,omitempty"` - RelayViaNode *Node `json:"relayViaNode,omitempty" yaml:"relayViaNode,omitempty"` - RelayReplyMethod *Method `json:"relayReplyMethod,omitempty" yaml:"relayReplyMethod,omitempty"` }