1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

removed copyFile<from/to> and relay request types

This commit is contained in:
postmannen 2022-06-16 23:32:44 +02:00
parent a03ff25da4
commit b8a2d3d5e7
16 changed files with 5 additions and 826 deletions

View file

@ -67,7 +67,6 @@ ENV START_SUB_REQ_HTTP_GET ""
ENV START_SUB_REQ_HTTP_GET_SCHEDULED ""
ENV START_SUB_REQ_TAIL_FILE ""
ENV START_SUB_REQ_CLI_COMMAND_CONT ""
ENV START_SUB_REQ_RELAY ""
CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/steward\
-ringBufferSize=$RING_BUFFER_SIZE\
@ -107,8 +106,6 @@ CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/steward\
-startSubREQToFileAppend=$START_SUB_REQ_TO_FILE_APPEND\
-startSubREQToFile=$START_SUB_REQ_TO_FILE\
-startSubREQToFileNACK=$START_SUB_REQ_TO_FILE_NACK\
-startSubREQCopyFileFrom=$START_SUB_REQ_COPY_FILE_FROM\
-startSubREQCopyFileTo=$START_SUB_REQ_COPY_FILE_TO\
-startSubREQPing=$START_SUB_REQ_PING\
-startSubREQPong=$START_SUB_REQ_PONG\
-startSubREQCliCommand=$START_SUB_REQ_CLI_COMMAND\
@ -117,5 +114,4 @@ CMD ["ash","-c","env CONFIGFOLDER=./etc/ /app/steward\
-startSubREQHttpGetScheduled=$START_SUB_REQ_HTTP_GET_SCHEDULED\
-startSubREQTailFile=$START_SUB_REQ_TAIL_FILE\
-startSubREQCliCommandCont=$START_SUB_REQ_CLI_COMMAND_CONT\
-startSubREQRelay=$START_SUB_REQ_RELAY\
"]

124
README.md
View file

@ -29,13 +29,6 @@ As long as you can do something as an operator on in a shell on a system you can
- [Error messages from nodes](#error-messages-from-nodes)
- [Message handling and threads](#message-handling-and-threads)
- [Timeouts and retries for requests](#timeouts-and-retries-for-requests)
- [REQRelay](#reqrelay)
- [Relay Step 1](#relay-step-1)
- [Relay Step 2](#relay-step-2)
- [Relay Step 3](#relay-step-3)
- [Relay Step 4](#relay-step-4)
- [Relay Step 5](#relay-step-5)
- [Relay Step 6](#relay-step-6)
- [Flags and configuration file](#flags-and-configuration-file)
- [Schema for the messages to send into Steward via the API's](#schema-for-the-messages-to-send-into-steward-via-the-apis)
- [Nats messaging timeouts](#nats-messaging-timeouts)
@ -56,7 +49,6 @@ As long as you can do something as an operator on in a shell on a system you can
- [REQHttpGet](#reqhttpget)
- [REQHttpGetScheduled](#reqhttpgetscheduled)
- [REQHello](#reqhello)
- [REQCopyFileFrom](#reqcopyfilefrom)
- [REQCopySrc](#reqcopysrc)
- [REQErrorLog](#reqerrorlog)
- [Request Methods used for reply messages](#request-methods-used-for-reply-messages)
@ -285,75 +277,6 @@ In the above example, the values set meaning:
If no timeout are specified in a message the defaults specified in the **etc/config.yaml** are used.
#### REQRelay
Instead of injecting the new Requests on the central server, you can relay messages via another node as long as the nats-server authorization conf permits it. This is what REQRelay is for.
This functionality can be thought of as attaching a terminal to a Steward instance. Instead of injecting messages directly on for example the central Steward instance you can use another Steward instance and relay messages via the central instance.
Example configuration of relay authorization for nats-server can be found in the [doc folder](doc/).
Example:
```json
[
{
"directory":"/var/tail-logs/",
"fileName": "my-wifi.log",
"toNode": "node1",
"relayViaNode": "central",
"relayReplyMethod": "REQToConsole",
"methodArgs": ["bash","-c","tail -f /var/log/wifi.log"],
"method":"REQCliCommandCont",
"replyMethod":"REQToFileAppend",
"ACKTimeout":5,
"retries":3,
"replyACKTimeout":5,
"replyRetries":3,
"methodTimeout": 10
}
]
```
```text
1 2 3
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
------------▷│ │------------▷│ │------------▷│ │
│ node1 │ │ central │ │ node2 │
◁------------│ │◁------------│ │◁------------│ │
└─────────────┘ └─────────────┘ └─────────────┘
6 5 4
```
Steps Explained:
##### Relay Step 1
- The **relayViaNode** field of the message is checked, and If set the message will be encapsulated within a **REQRelayInitial** message where the original message field values are kept, and put back on the message queue on **node1**.
- The new **REQRelayInitial** message will again be picked up on **node1** and handled by the **REQRelayInitial handler**.
- The **REQRelayInitial handler** will set the message method to **REQRelay**, and forward the message to the node value in the **relayViaNode** field.
##### Relay Step 2
- On **central** the **REQRelay method handler** will recreate the **original** message, and forward it to **node2**.
##### Relay Step 3
- **Node2** receives the request, and executes the **original** method with the arguments specified.
##### Relay Step 4
- The result is sent back to **central** in the form of a normal reply message.
##### Relay Step 5
- When the **reply** message is received on central a copy of the **reply** message will be created , and forwarded to **node1** where it originated.
- The the **original reply method** `"replyMethod":"REQToFileAppend"` is handled on central.
##### Relay Step 6
- On **node1** the **relayReplyMethod** is checked for how to handle the message. In this case it is printed to the consoles STDOUT.
### Flags and configuration file
Steward supports both the use of flags with values set at startup, and the use of a config file.
@ -386,8 +309,6 @@ Steward supports both the use of flags with values set at startup, and the use o
- replyMethodTimeout : `int`
- directory : `string`
- fileName : `string`
- RelayViaNode: `string`
- RelayReplyMethod: `string`
### Nats messaging timeouts
@ -674,29 +595,6 @@ All nodes have the flag option to start sending Hello message to the central ser
]
```
#### REQCopyFileFrom
Copy a file from one node to another node.
- Source node to copy from is specified in the toNode/toNodes field
- The file to copy and the destination node is specified in the **methodArgs** field:
1. The first field is the full path of the source file.
2. The second field is the destination node for where to copy the file to.
3. The third field is the full path for where to write the copied file.
```json
[
{
"directory": "copy",
"fileName": "copy.log",
"toNodes": ["central"],
"method":"REQCopyFileFrom",
"methodArgs": ["./tmp2.txt","ship2","/tmp/tmp2.txt"],
"replyMethod":"REQToFileAppend"
}
]
```
#### REQCopySrc
Copy a file from one node to another node.
@ -1465,10 +1363,6 @@ StartSubREQToFileAppend bool
StartSubREQToFile bool
// Subscriber for writing to file without ACK
StartSubREQToFileNACK bool
// Subscriber for reading files to copy
StartSubREQCopyFileFrom bool
// Subscriber for writing copied files to disk
StartSubREQCopyFileTo bool
// Subscriber for Echo Request
StartSubREQPing bool
// Subscriber for Echo Reply
@ -1485,8 +1379,6 @@ StartSubREQHttpGetScheduled bool
StartSubREQTailFile bool
// Subscriber for continously delivery of output from cli commands.
StartSubREQCliCommandCont bool
// Subscriber for relay messages.
StartSubREQRelay bool
```
## Appendix-B
@ -1550,20 +1442,4 @@ FileName string `json:"fileName" yaml:"fileName"`
// generated and we also need a copy of the details of the the
// initial request message.
PreviousMessage *Message
// The node to relay the message via.
RelayViaNode Node `json:"relayViaNode" yaml:"relayViaNode"`
// The node where the relayed message originated, and where we want
// to send back the end result.
RelayFromNode Node `json:"relayFromNode" yaml:"relayFromNode"`
// The original value of the ToNode field of the original message.
RelayToNode Node `json:"relayToNode" yaml:"relayToNode"`
// The original method of the message.
RelayOriginalMethod Method `json:"relayOriginalMethod" yaml:"relayOriginalMethod"`
// The method to use when the reply of the relayed message came
// back to where originated from.
RelayReplyMethod Method `json:"relayReplyMethod" yaml:"relayReplyMethod"`
// done is used to signal when a message is fully processed.
// This is used for signaling back to the ringbuffer that we are
// done with processing a message, and the message can be removed
// from the ringbuffer and into the time series log.
```

View file

@ -110,10 +110,6 @@ type Configuration struct {
// Subscriber for writing to file without ACK
StartSubREQToFileNACK bool
// Subscriber for reading files to copy
StartSubREQCopyFileFrom bool
// Subscriber for writing copied files to disk
StartSubREQCopyFileTo bool
// Subscriber for reading files to copy
StartSubREQCopySrc bool
// Subscriber for writing copied files to disk
StartSubREQCopyDst bool
@ -133,8 +129,6 @@ type Configuration struct {
StartSubREQTailFile bool
// Subscriber for continously delivery of output from cli commands.
StartSubREQCliCommandCont bool
// Subscriber for relay messages.
StartSubREQRelay bool
}
// ConfigurationFromFile should have the same structure as
@ -186,8 +180,6 @@ type ConfigurationFromFile struct {
StartSubREQToFileAppend *bool
StartSubREQToFile *bool
StartSubREQToFileNACK *bool
StartSubREQCopyFileFrom *bool
StartSubREQCopyFileTo *bool
StartSubREQCopySrc *bool
StartSubREQCopyDst *bool
StartSubREQPing *bool
@ -198,7 +190,6 @@ type ConfigurationFromFile struct {
StartSubREQHttpGetScheduled *bool
StartSubREQTailFile *bool
StartSubREQCliCommandCont *bool
StartSubREQRelay *bool
}
// NewConfiguration will return a *Configuration.
@ -254,8 +245,6 @@ func newConfigurationDefaults() Configuration {
StartSubREQToFileAppend: true,
StartSubREQToFile: true,
StartSubREQToFileNACK: true,
StartSubREQCopyFileFrom: true,
StartSubREQCopyFileTo: true,
StartSubREQCopySrc: true,
StartSubREQCopyDst: true,
StartSubREQPing: true,
@ -266,7 +255,6 @@ func newConfigurationDefaults() Configuration {
StartSubREQHttpGetScheduled: true,
StartSubREQTailFile: true,
StartSubREQCliCommandCont: true,
StartSubREQRelay: false,
}
return c
}
@ -497,16 +485,6 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else {
conf.StartSubREQToFileNACK = *cf.StartSubREQToFileNACK
}
if cf.StartSubREQCopyFileFrom == nil {
conf.StartSubREQCopyFileFrom = cd.StartSubREQCopyFileFrom
} else {
conf.StartSubREQCopyFileFrom = *cf.StartSubREQCopyFileFrom
}
if cf.StartSubREQCopyFileTo == nil {
conf.StartSubREQCopyFileTo = cd.StartSubREQCopyFileTo
} else {
conf.StartSubREQCopyFileTo = *cf.StartSubREQCopyFileTo
}
if cf.StartSubREQCopySrc == nil {
conf.StartSubREQCopySrc = cd.StartSubREQCopySrc
} else {
@ -557,11 +535,6 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else {
conf.StartSubREQCliCommandCont = *cf.StartSubREQCliCommandCont
}
if cf.StartSubREQRelay == nil {
conf.StartSubREQRelay = cd.StartSubREQRelay
} else {
conf.StartSubREQRelay = *cf.StartSubREQRelay
}
return conf
}
@ -643,8 +616,6 @@ func (c *Configuration) CheckFlags() error {
flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false")
flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false")
flag.BoolVar(&c.StartSubREQToFileNACK, "startSubREQToFileNACK", fc.StartSubREQToFileNACK, "true/false")
flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false")
flag.BoolVar(&c.StartSubREQCopyFileTo, "startSubREQCopyFileTo", fc.StartSubREQCopyFileTo, "true/false")
flag.BoolVar(&c.StartSubREQCopySrc, "startSubREQCopySrc", fc.StartSubREQCopySrc, "true/false")
flag.BoolVar(&c.StartSubREQCopyDst, "startSubREQCopyDst", fc.StartSubREQCopyDst, "true/false")
flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false")
@ -655,7 +626,6 @@ func (c *Configuration) CheckFlags() error {
flag.BoolVar(&c.StartSubREQHttpGetScheduled, "startSubREQHttpGetScheduled", fc.StartSubREQHttpGetScheduled, "true/false")
flag.BoolVar(&c.StartSubREQTailFile, "startSubREQTailFile", fc.StartSubREQTailFile, "true/false")
flag.BoolVar(&c.StartSubREQCliCommandCont, "startSubREQCliCommandCont", fc.StartSubREQCliCommandCont, "true/false")
flag.BoolVar(&c.StartSubREQRelay, "startSubREQRelay", fc.StartSubREQRelay, "true/false")
purgeBufferDB := flag.Bool("purgeBufferDB", false, "true/false, purge the incoming buffer db and all it's state")

View file

@ -68,14 +68,8 @@ func (e *errorKernel) start(ringBufferBulkInCh chan<- []subjectAndMessage) error
}
sendErrorOrInfo := func(errEvent errorEvent) {
var er string
// Decide what extra information to add to the error message.
switch {
case errEvent.message.RelayFromNode != "":
er = fmt.Sprintf("%v, node: %v, relayFromNode: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.message.RelayFromNode, errEvent.err)
default:
er = fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err)
}
er := fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err)
sam := subjectAndMessage{
Subject: newSubject(REQErrorLog, "errorCentral"),

View file

@ -73,21 +73,6 @@ type Message struct {
// initial request message.
PreviousMessage *Message
// The node to relay the message via.
RelayViaNode Node `json:"relayViaNode" yaml:"relayViaNode"`
// The original value of the RelayViaNode.
RelayOriginalViaNode Node `json:"relayOriginalViaNode" yaml:"relayOriginalViaNode"`
// The node where the relayed message originated, and where we want
// to send back the end result.
RelayFromNode Node `json:"relayFromNode" yaml:"relayFromNode"`
// The original value of the ToNode field of the original message.
RelayToNode Node `json:"relayToNode" yaml:"relayToNode"`
// The original method of the message.
RelayOriginalMethod Method `json:"relayOriginalMethod" yaml:"relayOriginalMethod"`
// The method to use when the reply of the relayed message came
// back to where originated from.
RelayReplyMethod Method `json:"relayReplyMethod" yaml:"relayReplyMethod"`
// done is used to signal when a message is fully processed.
// This is used for signaling back to the ringbuffer that we are
// done with processing a message, and the message can be removed

View file

@ -462,47 +462,6 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
}
}
// Send final reply for a relayed message back to the originating node.
//
// Check if the previous message was a relayed message, and if true
// make a copy of the current message where the to field is set to
// the value of the previous message's RelayFromNode field, so we
// also can send the a copy of the reply back to where it originated.
if message.PreviousMessage != nil && message.PreviousMessage.RelayOriginalViaNode != "" {
// make a copy of the message
msgCopy := message
msgCopy.ToNode = msgCopy.PreviousMessage.RelayFromNode
// We set the replyMethod of the initial message.
// If no RelayReplyMethod was found, we default to the reply
// method of the previous message.
switch {
case msgCopy.PreviousMessage.RelayReplyMethod == "":
er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previous message: %v ", msgCopy)
p.errorKernel.errSend(p, message, er)
msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod
case msgCopy.PreviousMessage.RelayReplyMethod != "":
msgCopy.Method = msgCopy.PreviousMessage.RelayReplyMethod
}
// Reset the previousMessage relay fields so the message don't loop.
message.PreviousMessage.RelayViaNode = ""
message.PreviousMessage.RelayOriginalViaNode = ""
// Create a SAM for the msg copy that will be sent back the where the
// relayed message originated from.
sam, err := newSubjectAndMessage(msgCopy)
if err != nil {
er := fmt.Errorf("error: subscriberHandler: newSubjectAndMessage : %v, message copy: %v", err, msgCopy)
p.errorKernel.errSend(p, message, er)
}
p.toRingbufferCh <- []subjectAndMessage{sam}
}
// Check if it is an ACK or NACK message, and do the appropriate action accordingly.
//
// With ACK messages Steward will keep the state of the message delivery, and try to

View file

@ -133,14 +133,6 @@ func (p *processes) Start(proc process) {
proc.startup.subREQToFileNACK(proc)
}
if proc.configuration.StartSubREQCopyFileFrom {
proc.startup.subREQCopyFileFrom(proc)
}
if proc.configuration.StartSubREQCopyFileTo {
proc.startup.subREQCopyFileTo(proc)
}
if proc.configuration.StartSubREQCopySrc {
proc.startup.subREQCopySrc(proc)
}
@ -232,12 +224,6 @@ func (p *processes) Start(proc process) {
proc.startup.subREQCliCommandCont(proc)
}
if proc.configuration.StartSubREQRelay {
proc.startup.subREQRelay(proc)
}
proc.startup.subREQRelayInitial(proc)
proc.startup.subREQPublicKey(proc)
}
@ -672,22 +658,6 @@ func (s startup) subREQToFileNACK(p process) {
go proc.spawnWorker()
}
func (s startup) subREQCopyFileFrom(p process) {
log.Printf("Starting copy file from subscriber: %#v\n", p.node)
sub := newSubject(REQCopyFileFrom, string(p.node))
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
go proc.spawnWorker()
}
func (s startup) subREQCopyFileTo(p process) {
log.Printf("Starting copy file to subscriber: %#v\n", p.node)
sub := newSubject(REQCopyFileTo, string(p.node))
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
go proc.spawnWorker()
}
func (s startup) subREQCopySrc(p process) {
log.Printf("Starting copy src subscriber: %#v\n", p.node)
sub := newSubject(REQCopySrc, string(p.node))
@ -728,23 +698,6 @@ func (s startup) subREQCliCommandCont(p process) {
go proc.spawnWorker()
}
func (s startup) subREQRelay(p process) {
nodeWithRelay := fmt.Sprintf("*.%v", p.node)
log.Printf("Starting Relay: %#v\n", nodeWithRelay)
sub := newSubject(REQRelay, string(nodeWithRelay))
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
go proc.spawnWorker()
}
func (s startup) subREQRelayInitial(p process) {
log.Printf("Starting Relay Initial: %#v\n", p.node)
sub := newSubject(REQRelayInitial, string(p.node))
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
go proc.spawnWorker()
}
func (s startup) subREQPublicKey(p process) {
log.Printf("Starting get Public Key subscriber: %#v\n", p.node)
sub := newSubject(REQPublicKey, string(p.node))

View file

@ -99,11 +99,6 @@ const (
REQToFile Method = "REQToFile"
// REQToFileNACK same as REQToFile but NACK.
REQToFileNACK Method = "REQToFileNACK"
// Read the source file to be copied to some node.
REQCopyFileFrom Method = "REQCopyFileFrom"
// Write the destination copied to some node.
REQCopyFileTo Method = "REQCopyFileTo"
// Initial request for file copying.
// Initiated by the user.
REQCopySrc Method = "REQCopySrc"
// Initial request for file copying.
@ -130,10 +125,6 @@ const (
REQHttpGetScheduled Method = "REQHttpGetScheduled"
// Tail file
REQTailFile Method = "REQTailFile"
// Write to steward socket
REQRelay Method = "REQRelay"
// The method handler for the first step in a relay chain.
REQRelayInitial Method = "REQRelayInitial"
// REQNone is used when there should be no reply.
REQNone Method = "REQNone"
// REQTest is used only for testing to be able to grab the output
@ -228,12 +219,6 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
REQToFileNACK: methodREQToFile{
event: EventNACK,
},
REQCopyFileFrom: methodREQCopyFileFrom{
event: EventACK,
},
REQCopyFileTo: methodREQCopyFileTo{
event: EventACK,
},
REQCopySrc: methodREQCopySrc{
event: EventACK,
},
@ -267,12 +252,6 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
REQTailFile: methodREQTailFile{
event: EventACK,
},
REQRelay: methodREQRelay{
event: EventACK,
},
REQRelayInitial: methodREQRelayInitial{
event: EventACK,
},
REQPublicKey: methodREQPublicKey{
event: EventACK,
},

View file

@ -159,7 +159,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
// Get the file permissions
fileInfo, err := os.Stat(SrcFilePath)
if err != nil {
// errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
// errCh <- fmt.Errorf("error: methodREQCopySrc: failed to open file: %v, %v", SrcFilePath, err)
log.Printf("error: copySrcSubProcFunc: failed to stat file: %v\n", err)
return
}
@ -361,7 +361,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
fh, err := os.Open(cia.SrcFilePath)
if err != nil {
// errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
// errCh <- fmt.Errorf("error: copySrcSubProcFunc: failed to open file: %v, %v", SrcFilePath, err)
log.Fatalf("error: copySrcSubProcFunc: failed to open file: %v\n", err)
return nil
}

View file

@ -1,13 +1,9 @@
package steward
import (
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
"sync"
"github.com/hpcloud/tail"
)
@ -114,255 +110,6 @@ func (m methodREQToFile) handler(proc process, message Message, node string) ([]
return ackMsg, nil
}
// ----
type methodREQCopyFileFrom struct {
event Event
}
func (m methodREQCopyFileFrom) getKind() Event {
return m.event
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
func (m methodREQCopyFileFrom) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
switch {
case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQCopyFileFrom: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
proc.errorKernel.errSend(proc, message, er)
return
}
SrcFilePath := message.MethodArgs[0]
DstNode := message.MethodArgs[1]
DstFilePath := message.MethodArgs[2]
// Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
defer cancel()
outCh := make(chan []byte)
errCh := make(chan error)
// Read the file, and put the result on the out channel to be sent when done reading.
proc.processes.wg.Add(1)
go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh)
// Wait here until we got the data to send, then create a new message
// and send it.
// Also checking the ctx.Done which calls Cancel will allow us to
// kill all started go routines started by this message.
select {
case <-ctx.Done():
er := fmt.Errorf("error: methodREQCopyFile: got <-ctx.Done(): %v", message.MethodArgs)
proc.errorKernel.errSend(proc, message, er)
return
case er := <-errCh:
proc.errorKernel.errSend(proc, message, er)
return
case out := <-outCh:
dstDir := filepath.Dir(DstFilePath)
dstFile := filepath.Base(DstFilePath)
// Prepare for sending a new message with the output
// Copy the original message to get the defaults for timeouts etc,
// and set new values for fields to change.
msg := message
msg.ToNode = Node(DstNode)
//msg.Method = REQToFile
msg.Method = REQCopyFileTo
msg.Data = out
msg.Directory = dstDir
msg.FileName = dstFile
// Create SAM and put the message on the send new message channel.
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
replyData := fmt.Sprintf("info: succesfully read the file %v, and sent the content to %v\n", SrcFilePath, DstNode)
newReplyMessage(proc, message, []byte(replyData))
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// copyFileFrom will read a file to be copied from the specified SrcFilePath.
// The result of be delivered on the provided outCh.
func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, errCh chan error, outCh chan []byte) {
defer wg.Done()
const natsMaxMsgSize = 1000000
fi, err := os.Stat(SrcFilePath)
// Check if the src file exists, and that it is not bigger than
// the default limit used by nats which is 1MB.
switch {
case os.IsNotExist(err):
errCh <- fmt.Errorf("error: methodREQCopyFile: src file not found: %v", SrcFilePath)
return
case fi.Size() > natsMaxMsgSize:
errCh <- fmt.Errorf("error: methodREQCopyFile: src file to big. max size: %v", natsMaxMsgSize)
return
}
fh, err := os.Open(SrcFilePath)
if err != nil {
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
return
}
b, err := io.ReadAll(fh)
if err != nil {
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to read file: %v, %v", SrcFilePath, err)
return
}
select {
case outCh <- b:
// fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n")
case <-ctx.Done():
return
}
}
// ----
type methodREQCopyFileTo struct {
event Event
}
func (m methodREQCopyFileTo) getKind() Event {
return m.event
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
// Same as the REQToFile, but this requst type don't use the default data folder path
// for where to store files or add information about node names.
// This method also sends a msgReply back to the publisher if the method was done
// successfully, where REQToFile do not.
// This method will truncate and overwrite any existing files.
func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
defer cancel()
// Put data that should be the result of the action done in the inner
// go routine on the outCh.
outCh := make(chan []byte)
// Put errors from the inner go routine on the errCh.
errCh := make(chan error)
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// ---
switch {
case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQCopyFileTo: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
proc.errorKernel.errSend(proc, message, er)
return
}
// Pick up the values for the directory and filename for where
// to store the file.
DstFilePath := message.MethodArgs[2]
dstDir := filepath.Dir(DstFilePath)
dstFile := filepath.Base(DstFilePath)
fileRealPath := path.Join(dstDir, dstFile)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(dstDir); os.IsNotExist(err) {
err := os.MkdirAll(dstDir, 0700)
if err != nil {
er := fmt.Errorf("failed to create toFile directory tree: subject:%v, folderTree: %v, %v", proc.subject, dstDir, err)
errCh <- er
return
}
{
er := fmt.Errorf("info: MethodREQCopyFileTo: Creating folders %v", dstDir)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
}
}
// Open file and write data. Truncate and overwrite any existing files.
file := filepath.Join(dstDir, dstFile)
f, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil {
er := fmt.Errorf("failed to open file, check that you've specified a value for fileName in the message: directory: %v, fileName: %v, %v", message.Directory, message.FileName, err)
errCh <- er
return
}
defer f.Close()
_, err = f.Write(message.Data)
f.Sync()
if err != nil {
er := fmt.Errorf("failed to write to file: file: %v, error: %v", file, err)
errCh <- er
}
// All went ok, send a signal to the outer select statement.
outCh <- []byte(fileRealPath)
// ---
}()
// Wait for messages received from the inner go routine.
select {
case <-ctx.Done():
er := fmt.Errorf("error: methodREQCopyFileTo: got <-ctx.Done(): %v", message.MethodArgs)
proc.errorKernel.errSend(proc, message, er)
return
case err := <-errCh:
er := fmt.Errorf("error: methodREQCopyFileTo: %v", err)
proc.errorKernel.errSend(proc, message, er)
return
case out := <-outCh:
replyData := fmt.Sprintf("info: succesfully created and wrote the file %v\n", out)
newReplyMessage(proc, message, []byte(replyData))
return
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// --- methodREQTailFile
type methodREQTailFile struct {

View file

@ -233,150 +233,6 @@ func (m methodREQPong) handler(proc process, message Message, node string) ([]by
// ---
type methodREQRelayInitial struct {
event Event
}
func (m methodREQRelayInitial) getKind() Event {
return m.event
}
// Handler to relay messages via a host.
func (m methodREQRelayInitial) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
// Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
defer cancel()
outCh := make(chan []byte)
errCh := make(chan error)
nothingCh := make(chan struct{}, 1)
var out []byte
// If the actual Method for the message is REQCopyFileFrom we need to
// do the actual file reading here so we can fill the data field of the
// message with the content of the file before relaying it.
switch {
case message.RelayOriginalMethod == REQCopyFileFrom:
switch {
case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQRelayInitial: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
proc.errorKernel.errSend(proc, message, er)
return
}
SrcFilePath := message.MethodArgs[0]
//DstFilePath := message.MethodArgs[2]
// Read the file, and put the result on the out channel to be sent when done reading.
proc.processes.wg.Add(1)
go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh)
// Since we now have read the source file we don't need the REQCopyFileFrom
// request method anymore, so we change the original method of the message
// so it will write the data after the relaying.
//dstDir := filepath.Dir(DstFilePath)
//dstFile := filepath.Base(DstFilePath)
message.RelayOriginalMethod = REQCopyFileTo
//message.FileName = dstFile
//message.Directory = dstDir
default:
// No request type that need special handling if relayed, so we should signal that
// there is nothing to do for the select below.
// We need to do this signaling in it's own go routine here, so we don't block here
// since the select below is in the same function.
go func() {
nothingCh <- struct{}{}
}()
}
select {
case <-ctx.Done():
er := fmt.Errorf("error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v", message.MethodArgs)
proc.errorKernel.errSend(proc, message, er)
return
case er := <-errCh:
proc.errorKernel.errSend(proc, message, er)
return
case <-nothingCh:
// Do nothing.
case out = <-outCh:
}
// relay the message to the actual host here by prefixing the the RelayToNode
// to the subject.
relayTo := fmt.Sprintf("%v.%v", message.RelayToNode, message.RelayOriginalViaNode)
// message.ToNode = message.RelayOriginalViaNode
message.ToNode = Node(relayTo)
message.FromNode = Node(node)
message.Method = REQRelay
message.Data = out
sam, err := newSubjectAndMessage(message)
if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
}()
// Send back an ACK message.
ackMsg := []byte("confirmed REQRelay from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQRelay struct {
event Event
}
func (m methodREQRelay) getKind() Event {
return m.event
}
// Handler to relay messages via a host.
func (m methodREQRelay) handler(proc process, message Message, node string) ([]byte, error) {
// relay the message here to the actual host here.
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
message.ToNode = message.RelayToNode
message.FromNode = Node(node)
message.Method = message.RelayOriginalMethod
sam, err := newSubjectAndMessage(message)
if err != nil {
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er)
return
}
select {
case proc.toRingbufferCh <- []subjectAndMessage{sam}:
case <-proc.ctx.Done():
}
}()
// Send back an ACK message.
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
type methodREQToConsole struct {
event Event
}

View file

@ -304,78 +304,6 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
// it out of the K/V Store.
r.deleteKeyFromBucket(r.samValueBucket, strconv.Itoa(v.ID))
//m := v.Data.Message
//t := time.Now().Format("Mon Jan _2 15:04:05 2006")
//tmpout := os.Stdout
//_ = fmt.Sprintf("%v\n", t)
//_ = fmt.Sprintf("%v\n", m.ID)
//_ = fmt.Sprintf("%v\n", m.ToNode)
//_ = fmt.Sprintf("%v\n", m.ToNodes)
//_ = fmt.Sprintf("%v\n", m.Data)
//_ = fmt.Sprintf("%v\n", m.Method)
//_ = fmt.Sprintf("%v\n", m.MethodArgs)
//_ = fmt.Sprintf("%v\n", m.ArgSignature)
//_ = fmt.Sprintf("%v\n", m.ReplyMethod)
//_ = fmt.Sprintf("%v\n", m.ReplyMethodArgs)
//_ = fmt.Sprintf("%v\n", m.IsReply)
//_ = fmt.Sprintf("%v\n", m.FromNode)
//_ = fmt.Sprintf("%v\n", m.ACKTimeout)
//_ = fmt.Sprintf("%v\n", m.Retries)
//_ = fmt.Sprintf("%v\n", m.ReplyACKTimeout)
//_ = fmt.Sprintf("%v\n", m.ReplyRetries)
//_ = fmt.Sprintf("%v\n", m.MethodTimeout)
//_ = fmt.Sprintf("%v\n", m.ReplyMethodTimeout)
//_ = fmt.Sprintf("%v\n", m.Directory)
//_ = fmt.Sprintf("%v\n", m.FileName)
//_ = fmt.Sprintf("%v\n", m.PreviousMessage)
//_ = fmt.Sprintf("%v\n", m.RelayViaNode)
//_ = fmt.Sprintf("%v\n", m.RelayOriginalViaNode)
//_ = fmt.Sprintf("%v\n", m.RelayFromNode)
//_ = fmt.Sprintf("%v\n", m.RelayToNode)
//_ = fmt.Sprintf("%v\n", m.RelayOriginalMethod)
//_ = fmt.Sprintf("%v\n", m.RelayReplyMethod)
//_ = fmt.Sprintf("%v\n", m.done)
//str := fmt.Sprintln(
// t,
// m.ID,
// m.ToNode,
// m.ToNodes,
// m.Data,
// m.Method,
// m.MethodArgs,
// m.ArgSignature,
// m.ReplyMethod,
// m.ReplyMethodArgs,
// m.IsReply,
// m.FromNode,
// m.ACKTimeout,
// m.Retries,
// m.ReplyACKTimeout,
// m.ReplyRetries,
// m.MethodTimeout,
// m.ReplyMethodTimeout,
// m.Directory,
// m.FileName,
// m.PreviousMessage,
// m.RelayViaNode,
// m.RelayOriginalViaNode,
// m.RelayFromNode,
// m.RelayToNode,
// m.RelayOriginalMethod,
// m.RelayReplyMethod,
// m.done,
//)
//r.permStore <- fmt.Sprintf("%v\n", str)
// NB: Removed this one since it creates a data race with the storing of the hash value in
// the methodREQKeysDeliverUpdate. Sorted by splitting up the sprint below with the sprint
// above for now, but should investigate further what might be the case here, since the
// message have no reference to the proc and should in theory not create a race.
//
js, err := json.Marshal(msgForPermStore)
if err != nil {
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)

View file

@ -46,5 +46,4 @@ START_SUB_REQ_TO_CONSOLE=true
START_SUB_REQ_HTTP_GET=true
START_SUB_REQ_HTTP_GET_SCHEDULED=true
START_SUB_REQ_TAIL_FILE=true
START_SUB_REQ_CLI_COMMAND_CONT=true
START_SUB_REQ_RELAY=true
START_SUB_REQ_CLI_COMMAND_CONT=true

View file

@ -461,31 +461,6 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
m := sam.Message
// Check if it is a relay message
if m.RelayViaNode != "" && m.RelayViaNode != Node(s.nodeName) {
// Keep the original values.
m.RelayFromNode = m.FromNode
m.RelayToNode = m.ToNode
m.RelayOriginalViaNode = m.RelayViaNode
m.RelayOriginalMethod = m.Method
// Convert it to a relay initial message.
m.Method = REQRelayInitial
// Set the toNode of the message to this host, so we send
// it to ourselves again and pick it up with the subscriber
// for the REQReplyInitial handler method.
m.ToNode = Node(s.nodeName)
// We are now done with the initial checking for if the new
// message is a relay message, so we empty the viaNode field
// so we don't end in an endless loop here.
// The value is stored in RelayOriginalViaNode for later use.
m.RelayViaNode = ""
sam.Subject = newSubject(REQRelayInitial, string(s.nodeName))
}
subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher)

36
tui.go
View file

@ -338,37 +338,7 @@ func drawMessageInputFields(p slideMessageEdit, m tuiMessage) {
}
p.inputForm.AddInputField(fieldName, value, fieldWidth, nil, nil)
case "RelayViaNode":
// Get nodes from file.
values, err := getNodeNames("nodeslist.cfg")
if err != nil {
log.Printf("error: unable to open file: %v\n", err)
os.Exit(1)
}
if m.RelayViaNode != nil && *m.RelayViaNode != "" {
tmp := []string{string(*m.RelayViaNode)}
tmp = append(tmp, values...)
values = tmp
}
p.inputForm.AddDropDown(fieldName, values, 0, nil).SetItemPadding(1)
//c.msgForm.AddDropDown(mRefVal.Type().Field(i).Name, values, 0, nil).SetItemPadding(1)
case "RelayReplyMethod":
var v Method
rm := v.GetReplyMethods()
values := []string{}
for _, k := range rm {
values = append(values, string(k))
}
if m.RelayReplyMethod != nil && *m.RelayReplyMethod != "" {
tmp := []string{string(*m.RelayReplyMethod)}
tmp = append(tmp, values...)
values = tmp
}
p.inputForm.AddDropDown(fieldName, values, 0, nil).SetItemPadding(1)
default:
// Add a no definition fields to the form if a a field within the
// struct were missing an action above, so we can easily detect
@ -583,12 +553,6 @@ func (t *tui) messageSlide(app *tview.Application) tview.Primitive {
m.Directory = &value
case "FileName":
m.FileName = &value
case "RelayViaNode":
v := Node(value)
m.RelayViaNode = &v
case "RelayReplyMethod":
v := Method(value)
m.RelayReplyMethod = &v
default:
fmt.Fprintf(p.logForm, "%v : error: did not find case definition for how to handle the \"%v\" within the switch statement\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), label)

View file

@ -15,6 +15,4 @@ type tuiMessage struct {
ReplyMethodTimeout *int `json:"replyMethodTimeout,omitempty" yaml:"replyMethodTimeout,omitempty"`
Directory *string `json:"directory,omitempty" yaml:"directory,omitempty"`
FileName *string `json:"fileName,omitempty" yaml:"fileName,omitempty"`
RelayViaNode *Node `json:"relayViaNode,omitempty" yaml:"relayViaNode,omitempty"`
RelayReplyMethod *Method `json:"relayReplyMethod,omitempty" yaml:"relayReplyMethod,omitempty"`
}